領先一步
VMware 提供培訓和認證,以加速您的進展。
了解更多事件驅動架構很棒。但是如果沒有框架,編寫與流行的事件訊息傳遞平台協作所需的基本架構可能會很混亂。在這篇文章中,我們將看看如何使用 Spring Cloud Stream 來簡化您的程式碼。
您只想為您的事件驅動應用程式編寫邏輯,但是樣板訊息程式碼可能會妨礙您。將您的應用程式連接到訊息服務很棘手,如果您是企業開發人員,您可能需要使用多種訊息傳遞技術(無論是內部部署還是雲端)。
讓彈性的訊息傳遞抽象化處理複雜的訊息傳遞平台整合,以便您可以專注於編寫簡單乾淨的業務邏輯。Spring Cloud Stream 是一個絕佳的選擇。它在一個易於使用的 API 後面統一了許多流行的訊息傳遞平台,包括 RabbitMQ、Apache Kafka、Amazon Kinesis、Google PubSub、Solace PubSub+、Azure Event Hubs 和 Apache RocketMQ。它甚至消除了這些平台之間方法和功能上的任何細微差異(例如分區或交換器),讓您可以自由創建創新的事件驅動解決方案。
在接下來的示範中,您將確切地看到 Spring Cloud Stream 的巧妙抽象化如何幫助使事件串流程式碼更乾淨且更容易使用。您還將看到使用 Spring Cloud Stream 的 binding
函式庫在兩個不同的訊息傳遞平台(RabbitMQ 或 Kafka)之間切換是多麼容易。
這些事件驅動微服務需要在您的 PC 上安裝最新版本的這些應用程式1
首先,從 GitHub 克隆程式碼儲存庫。若要執行此操作(如果您已安裝 Git),請開啟一個新的終端機視窗並發出以下命令。如果您沒有安裝 Git,請下載並解壓縮此 zip 檔案。
git clone https://github.com/benwilcock/spring-cloud-stream-demo.git
檢查程式碼後,您會注意到此儲存庫包含兩個微服務。
Loansource
微服務(在 /loansource
資料夾中)。此微服務充當事件訊息的來源。這些事件是 Loan
貸款申請,類似於您在銀行和金融領域看到的情況。每筆貸款都有一個「名稱」、「金額」和「狀態」(最初設定為 PENDING
)。
Loancheck
微服務(在 /loancheck
資料夾中)。此微服務充當 Loan
處理器。它檢查哪些貸款是好的貸款,並將它們分類為 APPROVED
或 DECLINED
狀態。
若要執行示範,請按照以下指示。
在一個全新的終端機視窗中,前往專案的根資料夾並發出以下命令。
您需要安裝並執行 「Docker」,此腳本才能正常運作,因為它需要
docker-compose
。
./start-servers.sh
此腳本將啟動 Kafka 和 RabbitMQ,並將兩者的日誌輸出串流到終端機視窗(除非您使用 Ctrl-C
退出)。當您按下 Ctrl-C
時,伺服器不會停止 - 它們將繼續在背景執行。啟動後,這些伺服器將可供您電腦上執行的應用程式使用。
在接下來的步驟 3 和 4 中,我們必須將 -P<設定檔選擇>
替換為我們想要使用的訊息傳遞平台的名稱。
-Pkafka
-Prabbit
如果您完全省略 -P<設定檔選擇>
設定,則會使用 Kafka。
注意:此示範並非設計用於「橋接」Kafka 和 RabbitMQ 之間的訊息,因此請務必在編譯和執行這兩個應用程式時選擇相同的設定檔名稱。如果您的目標是橋接訊息傳遞系統,請參閱此處的文件。
在一個新的終端機視窗中,使用 cd
將 /loansource
目錄設為目前目錄,然後發出以下命令,將 <設定檔選擇>
替換為您想要執行的模式(如上述步驟 2 中討論的 kafka
或 rabbit
模式)。
./mvnw clean package spring-boot:run -DskipTests=true -P<profile-choice>
一旦 loansource
應用程式啟動,在終端機視窗中,您應該每秒看到一則訊息,告訴您已將新的貸款事件以 PENDING
狀態發布到訊息傳遞平台。讓此微服務保持執行,然後繼續進行下一步。
在另一個新的終端機視窗中,將 /loancheck
目錄設為您的目前目錄,然後發出以下命令,再次將 <設定檔選擇>
替換為您想要執行的模式。
./mvnw clean package spring-boot:run -DskipTests=true -P<profile-choice>
一旦 loancheck
應用程式啟動,在終端機視窗中,您應該每秒看到一則訊息,告訴您已從訊息傳遞平台讀取新的 PENDING
貸款申請,並將其設為 APPROVED
或 DECLINED
。如果您想了解這些應用程式是如何建構的,請跳到「運作方式」。
一旦您完成微服務的操作,請在 /loansource
和 /loancheck
微服務的每個終端機視窗中按下 Ctrl-C
。應用程式將停止運行,事件處理將停止。
如果您正在 Kafka 和 Rabbit 之間切換模式,只需回到步驟 2 並重複該過程。
如果您完全完成示範,並且也想停止 Kafka 和 RabbitMQ 伺服器,請在專案根資料夾的終端機視窗中執行
./stop-servers.sh
腳本。如果您只是在模式之間切換,則不需要這樣做。
Maven 設定檔(在每個專案的 pom.xml
中)控制在建置時,哪些 Spring Cloud Stream 綁定會作為依賴項新增。當您選擇 -Pkafka
時,[spring-cloud-stream-binder-kafka][kafka]
依賴項會新增到專案中。當您選擇 -Prabbit
時,[spring-cloud-stream-binder-rabbit][rabbit]
依賴項會新增。
<profiles>
<profile>
<id>kafka</id>
<properties>
<spring.profile.activated>kafka</spring.profile.activated>
</properties>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${spring-cloud-stream.version}</version>
</dependency>
</dependencies>
</profile>
...
<profiles>
您選擇的 Maven 設定檔也會影響 src/main/resources/application.properties
檔案中的 spring.profiles.active
屬性,該屬性會切換您在啟動時看到的橫幅。
對於 Loansource 微服務,我們正在使用 Spring Cloud Stream v2.1 的新功能 - Spring Cloud Function 支援。有了這個新功能,要使 LoansourceApplication
微服務充當 Loan
訊息的來源,只需宣告一個 @Bean
方法,該方法產生並返回 Supplier<>
。在這種情況下,它是 Loan
類型的 Supplier
。函式方法程式碼如下所示...
@Bean
public Supplier<Loan> supplyLoan() {
return () -> {
Loan loan = new Loan(UUID.randomUUID().toString(), "Ben", 10000L);
LOG.info("{} {} for ${} for {}", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
return loan;
};
}
Supplier<>
是一種 Java 函式資料類型。由於只有一個 @Bean
方法返回此類型,因此 Spring Cloud Stream 確切地知道下一步該怎麼做。預設情況下,它將每秒觸發此函式一次,並將結果傳送到名為「output」的預設 MessageChannel
。這個函式方法的好處在於它只包含業務邏輯,因此您可以使用常規單元測試對其進行測試。
我們可以使用
application.properties
檔案中的spring.cloud.function.definition
屬性來明確宣告我們想要綁定到綁定目標的函式 bean - 但對於您只定義單個@Bean
的情況,這不是必要的。
如果我們想要使用不同的輪詢間隔,我們可以使用
application.properties
檔案中的spring.integration.poller.fixed-delay
屬性。
loancheck
微服務需要稍微多一點的程式碼,但不多。它的工作是將 Loan
事件分類到不同的通道中。為了做到這一點,它訂閱來自來源的 output
主題的事件,然後根據貸款的價值將它們發送到 approved
或 declined
主題中,這類似於欺詐檢查機制。
因為我們正在使用 3 個訊息傳遞通道(一個入站和兩個出站),所以使用一個簡單的 LoanProcessor
介面來闡明輸入和輸出。目前,它看起來像這樣
@Component
public interface LoanProcessor {
String APPLICATIONS_IN = "output"; // Topic where the new loans appear
String APPROVED_OUT = "approved"; // Topic where the approved loans are sent
String DECLINED_OUT = "declined"; // Topic where the declined loans are sent
@Input(APPLICATIONS_IN)
SubscribableChannel sourceOfLoanApplications();
@Output(APPROVED_OUT)
MessageChannel approved();
@Output(DECLINED_OUT)
MessageChannel declined();
}
LoanProcessor
介面首先在 @SpringBootApplication
類別 (LoanCheckApplication.java
) 中作為 @EnableBinding()
註解的參數被引用,如下所示。
@SpringBootApplication
@EnableBinding(LoanProcessor.class)
public class LoanCheckApplication {
public static void main(String[] args) {
SpringApplication.run(LoanCheckApplication.class, args);
}
}
此外,一個名為 LoanChecker.java
的 Spring @Component
在運行時使用此 LoanProcessor
建構。此外,每當新的 Loan
事件到達時,都會自動調用此元件的 checkAndSortLoans(Loan)
方法,因為它已註解為 LoanProcessor.APPLICATIONS_IN
通道的 @StreamListener()
。您可以在以下程式碼範例中看到此註解的使用方式。
@StreamListener(LoanProcessor.APPLICATIONS_IN)
public void checkAndSortLoans(Loan loan) {
if (loan.getAmount() > MAX_AMOUNT) {
loan.setStatus(Statuses.DECLINED.name());
processor.declined().send(message(loan));
} else {
loan.setStatus(Statuses.APPROVED.name());
processor.approved().send(message(loan));
}
}
然後,此方法使用簡單的業務邏輯對 Loan
物件進行排序。根據排序的結果,它將它們發送到 processor.approved()
通道或 processor.declined()
通道(在相應地設定其貸款狀態之後)。
如您所見,使用 Spring Cloud Streams 時獲得的關注點分離確實非常健康。在任何一個微服務中,絕對沒有特定於 Kafka 或 RabbitMQ 的程式碼。這使我們能夠專注於業務邏輯,而無需考慮訊息傳遞平台,並且您可以透過簡單地更改專案 pom.xml
中的「綁定器」依賴項來輕鬆交換訊息傳遞平台。
您可以透過以下方式查看事件在訊息傳遞平台中的流動
對於 Kafka,可以使用 KafDrop 工具在 localhost:9000
上觀察主題和事件訊息。無需登入。
對於 RabbitMQ,可以在 localhost:15672
上找到 Rabbit 管理控制台,可用於觀察交換器和事件訊息。若要登入,使用者名稱為 guest
,密碼也為 guest
。若要觀察實際的訊息內容,您可能需要手動建立一個佇列,並使用 #
作為您的 routing key
路由金鑰將其綁定到所需的主題。
若要隨時掌握 Spring Cloud Stream 的最新資訊,請訪問 Spring 網站上專案的專用專案頁面。
若要從頭開始建立您自己的 Spring 專案,請使用 start.spring.io 上的專案配置器。
如果您想深入了解 Spring 和純 Kafka,請查看這些精彩的部落格文章