雲端事件與 Spring - 第 2 部分

工程 | Oleg Zhurakousky | 2020 年 12 月 23 日 | ...

簡介

我們先快速總結一下先前的文章

  • Message 是一種足夠的結構和抽象概念,可用於在 Spring 的上下文中使用代表雲端事件的資料。 我們希望這很清楚。
  • 在 Spring 中,我們致力於隔離功能與非功能關注點,讓我們可以在框架層級處理非功能方面(例如傳送、接收、重試、連線、轉換等),讓您(主要)專注於實際的業務邏輯,並讓您的程式碼保持簡單,且可插入到各種執行環境(稍後詳述)。

業務問題

如承諾的,這篇文章更具技術性,因為它涵蓋了您可以嘗試的具體範例。 因此,我們首先描述我們將涵蓋的三個用例。 實際上,用例是相同的,但執行環境各不相同。

“接收代表要聘僱的人員的資料,產生員工記錄。”

三種不同的變體在於執行環境(典型的非功能關注點的範例)

  • HTTP 請求/回應
  • 從 AMQP 到 Apache Kafka
  • 從 RSocket 到 Apache Kafka。

用例和執行環境都不是真正新的或獨特的。 在 Spring 中,我們已經處理它們數十年了,有數千個應用程式在生產環境中執行。 那麼,新增雲端事件上下文會改變什麼嗎? 換句話說,如果輸入和輸出資料代表雲端事件,會改變什麼嗎? 這些是我們試圖在這篇文章中回答的問題。

這些範例的用戶程式碼是

@SpringBootApplication
public static class SampleApplication
  public static void main(String[] args) throws Exception {
    SpringApplication.run(SampleApplication.class, args);
  }

  @Bean
  public Function<Person, Employee> hire() {
    return person -> {
	Employee employee = new Employee(person);
	return employee;
    };
  }
}

是的,這有點無聊,因為它沒有顯示任何非功能方面,因為它們是由特定於執行環境的框架處理的。 我們還保持了函數實作細節的簡單性,因為它們與主題無關。 框架並不真正關心您做什麼。 它只關心您的期望 - 輸入 - 以及您產生的 - 輸出 - 並且該資訊可從簽章中獲得。

用例 1(透過 HTTP)

此範例的完整原始碼可在 Spring Cloud Function 範例中找到。 在其中,我們將雲端事件作為 HTTP 請求傳送,並期望收到雲端事件作為 HTTP 回應。 這意味著,我們的 hire() 函數需要以某種方式成為 HTTP 端點。 我們可以使用 Spring Cloud Function 框架來做到這一點。 透過新增其 spring-cloud-function-web 依賴項,我們新增 Spring Boot 自動配置和元件,以將我們的函數轉換為 HTTP 端點。 配置選項和預設值超出本文的範圍,但您可以從 Spring Cloud Function 文件的相關章節中取得它們。 重要的是,基於這些預設值,函數的名稱成為在 localhost 連接埠 8080 上執行的 URL 路徑的一部分,從而產生 https://127.0.0.1:8080/hire 端點。

現在您可以啟動應用程式並發佈到其中。 應用程式執行後,您可以使用以下命令 curl

curl -w'\n' localhost:8080/hire \
 -H "Content-Type: application/json" \
 -d '{"firstName":"John", "lastName":"Doe"}' -i

您應該收到以下回應

. . .
{"person":{"firstName":"John","lastName":"Doe"},"id":172,"message":"Employee 172 was hired on 17-12-2020"}

嗯。 。 。 。 這真的與雲端事件無關! 對吧...?

正確,但是框架將函數公開為 REST 端點、處理類型轉換、調用和其他非功能方面的能力是清晰的,並且與雲端事件直接相關。 繼續閱讀。 。 。

此類啟用的核心是 Message - 一種結構和類型,可讓傳入的 HTTP(或任何其他)請求採用規範形式,以便其他框架可以以統一的方式處理其內容,無論其來源或目的地如何。

但是等等,雲端事件呢?

讓我們透過新增代表所需雲端事件屬性的 HTTP 標頭,將此 HTTP 請求轉換為雲端事件。 請注意,這些標頭的前綴為 ce-,這是雲端事件規範的 HTTP 協定繫結 部分所要求的。

curl -w'\n' localhost:8080/hire \
 -H "ce-id: 0001" \
 -H "ce-specversion: 1.0" \
 -H "ce-type: hire" \
 -H "ce-source: spring.io/spring-event" \
 -H "Content-Type: application/json" \
 -d '{"firstName":"John", "lastName":"Doe"}' -i

執行後,您將看不到任何差異。 您的函數以相同的方式運作,並且您收到相同的回應。

當然,除非您查看並分析回應標頭,該標頭現在包含所需的雲端事件屬性(儘管與請求中的屬性不同)

ce-source: https://spring.dev.org.tw/cloudevent
ce-specversion: 1.0
ce-type: sample
ce-id: 76208faf-f8e5-4267-9028-bb4392d66765
message-type: cloudevent
timestamp: 1608211771624
Content-Type: application/json
Transfer-Encoding: chunked
Date: Thu, 17 Dec 2020 13:29:31 GMT
{"person":{"firstName":"John","lastName":"Doe"},"id":171,"message":"Employee 171 was hired on 17-12-2020"}

但是如何做到的?

在這一部分中,我們再次提醒您,我們致力於將非功能方面外包給框架,因為這是其中之一。 因此,預設情況下(由框架建立)我們假設,如果請求是雲端事件,則預期回應也應為雲端事件。 您也可以看到四個必需的雲端事件屬性具有也是透過遵循框架建立的某些預設規則而產生的值。 specversion 預設為 1.0type 預設為傳回物件的類型名稱,id 預設為產生的 UUID(以提供合理安全的唯一性期望),source 預設為 https://spring.dev.org.tw/

但是我不喜歡預設值。 我想要我自己的,並且我想新增其他屬性?

正如我們在先前的文章中提到的:「我們還公開了實用程式、程式庫和配置選項,讓您可以影響某些非功能性問題,因為由於各種原因,這樣做可能仍然是必需的。」 在這裡,您有兩個選項。 第一個選項:您可以變更函數簽章並傳回 Message<Employee>,您可以在其中新增其他中繼資料(即雲端事件屬性)。 一旦框架看到您傳回了 Message,它就不會嘗試對使用者新增的中繼資料執行任何額外操作。 這實際上是適用於大多數(如果不是全部)依賴 Spring Messaging 的框架的規則。 雖然此選項很簡單,但它會將非功能方面洩漏到您的業務邏輯中。 畢竟,您需要建立 Message 的實例,您需要新增代表雲端事件屬性的標頭(最好具有正確的 - 規範要求的 - 屬性前綴),依此類推。 但是此選項的最大缺陷是它會要求您變更函數的簽章並將功能和非功能方面混合在一起,這顯然違反了關注點分離規則。 但是,為了討論起見,以下是您將如何執行該操作

@Bean
public Function<Message<Person>, Message<Employee>> hire() {
  return message -> {
    Person person = message.getPayload();
    Employee employee = new Employee(person);
      return CloudEventMessageBuilder.withData(employee).setId("123456")
	.setSource(URI.create("https://spring.cloudevenets.sample")).build();
  };
}

範例原始碼包含它的註解版本。

第二個選項:您可以提供名為 CloudEventHeaderEnricher 的策略的實作,該策略提供一個單獨的位置,您可以在其中實作邏輯,以產生輸出的適當屬性和標頭。 框架在產生輸出 Message 時會調用此策略。 以下範例顯示了此策略的可能實作(也在範例中註解掉,因此取消註解、重新啟動應用程式並查看差異)。

@Bean
public CloudEventHeaderEnricher cloudEventEnricher() {
  return messageBuilder -> messageBuilder.setSource("https://spring.dev.org.tw/cloudevent")
	.setType("sample").setId("987654");
}

在這裡,您還可以查看可用於協助您建置雲端事件訊息的實用程式類別之一:CloudEventMessageBuilder。 它是以標準 Spring MessageBuilder 為模型,但具有雲端事件特定的設定器。 但是,此方法的主要優勢是關注點分離。 您的業務邏輯(您的功能程式碼)保持清潔。 此外,您仍然需要編寫的非功能程式碼是在單獨的位置編寫的。

還有一件事... 範例程式碼假設您只對 Cloud Event 的 data 部分感興趣,並且希望它是 POJO 的形式。但如果情況並非如此呢?如果您想要 Cloud Event 中的完整視圖呢?或者,如果您還想要 Cloud Event 資料的原始形式(即 byte[])呢?如前所述,框架從函數的簽名中獲得指令。因此,透過將您的輸入和輸出類型宣告為 Message,您實際上是在指示框架提供整個 Cloud Event 給您(而不僅僅是它的 data)。此外,透過指定 Message 的泛型類型,您可以指示框架將 Cloud Event 的 data 部分作為該 Java 類型提供,實際上是請求它執行類型轉換(如有必要)。所以,繼續嘗試以下簽名:public Function<Message<byte[]>, Message<Employee>> hire() {...}public Function<byte[], Employee> hire() {...} 或其他。

目前大概就這樣了。README 檔案和原始碼中的註解也在需要時提供額外的指示。

用例 2 (從 AMQP 到 Kafka)

該範例的完整原始碼可在 Spring Cloud Function 範例中找到。它假設您對 AMQP 和 Apache Kafka 具有一定程度的熟悉度。 在此範例中,我們使用 RabbitMQ(作為 AMQP 訊息代理)和 Apache Kafka。

雖然此用例可能看起來比前一個用例更複雜,但本節和後續章節(第三個用例)實際上非常簡短。這是因為前一節中解釋的所有內容也適用於此處。事實上,我們在這裡唯一改變的是執行環境。我們透過相同的機制來做到這一點:新增基於 Spring Boot 的相關自動組態。因此,在本例中,我們新增了兩個自動組態:一個用於 RabbitMQ(AMQP 訊息代理)綁定器,另一個用於 Spring Cloud Stream 框架中可用的 Apache Kafka 綁定器。還有一些額外的應用程式配置(您可以在 application.properties 檔案中看到),用於指示框架如何將 hire 函數的輸入端綁定到 RabbitMQ(透過 RabbitMQ 綁定器),以及將輸出端綁定到 Apache Kafka(透過 Apache Kafka 綁定器)。

假設您已執行 RabbitMQ 和 Kafka,請啟動應用程式並將訊息傳送到 RabbitMQ。您可以使用 RabbitMQ 儀表板(如果您已安裝)並將訊息傳送到 hire-in-0 交換器。
為了保持與 Cloud Event 規範的一致性,您應提供具有 AMQP 適當前綴(即 cloudEvents:)的屬性。考慮以下範例

cloudEvents:specversion=1.0
cloudEvents:type=hire
cloudEvents:source:spring.io/spring-event
cloudEvents:id=0001

然後考慮以下資料:{"firstName":"John", "lastName":"Doe"}

為了簡化此示範部分,我們包含了一個 測試案例,透過將 Cloud Event 傳送到 RabbitMQ 並從 Apache Kafka 接收一個 Cloud Event,有效地自動化此示範。

Message<byte[]> messageToAMQP = CloudEventMessageBuilder
	.withData("{\"firstName\":\"John\", \"lastName\":\"Doe\"}".getBytes())
	.setSource("https://cloudevent.demo")
	.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
	.build(CloudEventMessageUtils.AMQP_ATTR_PREFIX);

rabbitTemplate.send("hire-in-0", "#", messageToAMQP);
Message<String> resultFromKafka = queue.poll(2000, TimeUnit.MILLISECONDS);
System.out.println("Result Message: " + resultFromKafka);
. . .

請注意我們如何在此處使用 CloudEventMessageBuilder 僅將 source 設定為 Cloud Event 屬性,同時依靠其餘所需 Cloud Event 屬性的預設值。我們還使用 build(CloudEventMessageUtils.AMQP_ATTR_PREFIX) 來確保屬性以 cloudEvents: 前綴開頭(請參閱 Cloud Events AMQP 協定綁定)。另請注意,在接收端,Cloud Events 屬性現在以 ce_ 前綴開頭(請參閱 Cloud Events Kafka 協定綁定),因為框架已確定目標目的地是 Apache Kafka。最後一點值得詳細說明一下。我們已經確定設定 Cloud Event 屬性是非功能性的,因此,我們已經公開了一種機制,讓您可以處理業務邏輯之外的事情。但是屬性前綴呢?請注意,我們在不同的執行環境中執行相同的程式碼。這表示屬性前綴實際上取決於執行環境。因此,透過了解執行環境,框架可確保 Cloud Event 屬性前綴的正確性。

在這裡,我們依賴 Spring Cloud Stream 框架及其預設值,例如目的地自動佈建(Kafka 和 Rabbit)、綁定名稱、連線等等。這些預設值和配置選項的詳細資訊超出了本文的範圍,因為它們與 Cloud Events 無關。有關框架本身及其配置選項的更多詳細資訊,請參閱 Spring Cloud Stream 文件

此外,與上一個範例一樣,此範例也包含註解變體,歡迎您使用這些變體進行實驗。

用例 3 (從 RSocket 到 Kafka)

該範例的完整原始碼可在 Spring Cloud Function 範例中找到。它假設您對 RSocket 和 Apache Kafka 具有一定程度的熟悉度。本節應該比上一節更短,因為它非常相似。但是,這裡有一些有趣的變體值得討論。嗯,最明顯的是 RSocket。我們正在引入一種不同的傳遞機制。但真正讓它更有趣的是,沒有為 RSocket 定義協定綁定。我們可以選擇遵守 Kafka、HTTP 或 AMQP 規範之一,或者我們可以在結構化模式下傳達 Cloud Event,其中整個事件都編碼到某種結構(例如 JSON)中。

一些實作細節也與此範例中的其他用例不同。但是,這些細節與 Cloud Event 沒有任何關係。相反,它們是您可以使用的其他機制的示範。例如,我們使用 Consumer 而不是 Function,並手動使用 Spring Cloud Stream 框架提供的 StreamBridge 元件傳送輸出訊息。

因此,不用再多說,這是我們的應用程式程式碼

@Bean
public Consumer<Person> hire(StreamBridge streamBridge) {
  return person -> {
    Employee employee = new Employee(person);
    streamBridge.send("hire-out-0", CloudEventMessageBuilder.withData(employee)
	.setSource("https://spring.dev.org.tw/rsocket")
	.setId("1234567890")
	.build());
  };
}

請注意我們如何使用 CloudEventMessageBuilder 將輸出 Message 產生為 Cloud Event。

我們將以 JSON 編碼的 Cloud Event 的結構化表示形式,透過 RSocket 傳送到 hire() 函數

String payload = "{\n" +
	"    \"specversion\" : \"1.0\",\n" +
	"    \"type\" : \"org.springframework\",\n" +
	"    \"source\" : \"https://spring.dev.org.tw/\",\n" +
	"    \"id\" : \"A234-1234-1234\",\n" +
	"    \"datacontenttype\" : \"application/json\",\n" +
	"    \"data\" : {\n" +
	"        \"firstName\" : \"John\",\n" +
	"        \"lastName\" : \"Doe\"\n" +
	"    }\n" +
	"}";

rsocketRequesterBuilder.tcp("localhost", 55555)
	.route("hire")        // target function
	.data(payload).       // data we're sending
	.send()

預期的輸出應與先前的用例相似,因為目標目的地相同。

結論

如您所見,在 Spring 環境中處理 Cloud Events 時,您有多種選擇

  • 您可以選擇僅關心 Cloud Event 的內容,同時保持對外發 Cloud Event 外觀的完全控制。
  • 您可以選擇透過 Message 本身處理 Cloud Event,並依靠提供的公用程式來簡化對 Cloud Event 特定資料的存取。
  • 您可以選擇執行環境而不影響您的業務邏輯(使用者程式碼),同時委派框架來確保某些 Cloud Event 規格的正確性,例如屬性前綴。

這些只是與本文相關的一些,但還有更多。

已建立和經過驗證的模式、實作這些模式的框架,以及分層且有主見的 Spring Boot 自動組態使這一切成為可能。分層很重要,因為它們可讓您將問題劃分為可在其他存在相同問題的專案和整合中重複使用的解決方案。這有效地使當前的 Cloud Event 整合變得相當簡單,因為大多數與 Cloud Event 無關的非功能性方面(即,連線、傳送、接收、轉換、重試等)已經由 Spring Cloud Function 和 Spring Cloud Stream 背後的各個框架解決。

最後但並非最不重要的一點是,還有另一種處理 Cloud Events 和 Spring 的方法,那就是透過 Cloud Events Java SDK,您也可以在那裡找到一個 範例

取得 Spring 電子報

隨時掌握 Spring 電子報的最新資訊

訂閱

取得領先

VMware 提供培訓和認證,以加速您的進度。

了解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔。

了解更多

即將到來的活動

查看 Spring 社群中所有即將到來的活動。

查看全部