領先一步
VMware 提供培訓和認證,以加速您的進展。
瞭解更多這是部落格系列的第 3 部分,我們將在其中介紹 Spring Cloud Stream 應用程式的 Java 函數。
本系列的其他部分。
在本系列的過去兩篇部落格中,我們針對將所有現有的 Spring Cloud Stream App Starters 遷移到函數的新計畫,以及我們可以組合它們的各種方式,提供了一個一般介紹。在本篇部落格中,我們將繼續本系列,展示如何開發、測試這些函數,並使用它們來產生 Spring Cloud Stream 應用程式。特別是,在這裡我們將重點介紹如何編寫供應器函數(實作 java.util.function.Supplier
),然後為 Spring Cloud Stream 產生相應的來源應用程式。
為了將概念深植人心,我們將採用一個使用案例並實作一個解決方案來滿足它。
我們需要一個函數,當使用正確的配置調用時,可以提供我們部落格訂閱內容,格式為 Atom、RSS 等。我們需要支援兩種供應器調用模型 - 一種是以程式方式調用函數(例如,在 FaaS 環境中調用的 REST 端點),另一種是串流供應器,在其中我們可以在訂閱內容可用時立即獲得持續的訂閱內容流。我們希望基於 ROME 函式庫(一種用於訂閱內容聚合的熱門函式庫)來建構這些供應器。我們將從非 Spring 開發人員和 Spring 開發人員的角度來看待這個問題。
讓我們假設您不是 Spring 開發人員,並且不熟悉已經為 ROME 提供抽象化的 Spring Integration。在這種情況下,我們當然可以直接使用 ROME 來產生訂閱內容記錄。例如,這對於這種情況來說是一個有效的供應器。
public Supplier<SyndEntry> feedSupplier()
{
return () -> {
//Use the ROME framework directly to produce syndicated entries.
}
}
這裡的好處是,我們可以在不了解 Spring 的情況下開發供應器,並且可以使用該環境提供的抽象化或依賴 Spring Cloud Function 之類的框架,直接將其部署到無伺服器環境。
這基本上意味著,如果您是一位不具備太多 Spring Framework 技能的 Java 開發人員,您仍然可以使用僅在 java.util.function
套件中定義的介面(例如 Function
、Supplier
和 Consumer
)來編寫函數,並提供業務邏輯。然後,我們可以採用我們開發的此成品,並透過新增 Spring Cloud Stream 綁定器依賴項並將其變成 SpringBootApplication
,將其轉換為 Spring Cloud Stream 應用程式。透過提供一些配置屬性(例如中介軟體目的地),我們可以立即獲得在 Spring Cloud Data Flow 之類的平台上部署應用程式的附加價值,該平台將應用程式編排為資料管道的一部分。這樣,我們編寫的函數完全獨立於任何 Spring 依賴項,並且僅在部署旅程的最後階段才引入 Spring 元件,例如 Spring Cloud Stream、Spring Cloud Function 和 Spring Cloud Data Flow 等。下圖捕捉了這個想法。
正如我們所觀察到的,函數元件可以獨立調用,也可以在使其成為 Spring Cloud Stream 應用程式後,作為 Spring Cloud Data Flow 管道的一部分調用。
雖然上述模型可能是一個良好的起點,但當我們開始深入研究直接使用 ROME 框架時,我們可能會很快意識到這涉及大量繁重的工作和更深入的函式庫知識。犯錯的機率很高,因此我們需要編寫大量測試來驗證我們的自訂實作是否如預期般運作,以及是否涵蓋了所有邊角案例。我們開始想知道是否已經有更簡單的抽象化可用。這樣我們就不需要編寫任何 ROME 特定程式碼,因為抽象化層會處理所有複雜性。幸運的是,我們有一個解決方案。Spring Integration 為許多企業技術提供了許多入站和出站介面卡。Feed 介面卡 就是其中之一,其實作基於 ROME。事實上,我們在預先封裝的 串流應用程式 中提供的許多函數元件都基於 Spring Integration 介面卡。這些介面卡已被廣泛使用,並針對大量企業使用案例進行了嚴格的實戰測試。但是,我們想要編寫供應器的技術可能在 Spring Integration 中不可用。在這種情況下,正如我們在上面看到的,我們當然可以自行編寫程式碼並從供應器中調用它。
如果您尚未執行此操作,請 Fork 並 Clone 串流應用程式儲存庫。然後在 functions/supplier
下建立一個新的 feed-supplier
模組。使用現有的供應器之一作為範本來引導。
在專案中新增以下 Spring Integration Feed 介面卡依賴項。這將引入 Spring Integration 的 Feed 介面卡以及任何其他過渡性依賴項。
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-feed</artifactId>
</dependency>
現在我們已經有了核心依賴項,讓我們開始編寫一些程式碼。由於函數預期在 Spring Boot 環境中使用,因此我們需要建立一個 ConfigurationProperties
類別來驅動供應器函數的配置。以下是它的外觀。
package org.springframework.cloud.fn.supplier.feed;
@ConfigurationProperties("feed.supplier")
public class FeedSupplierProperties {
/**
* Key used in metadata store to avoid duplicate read from the feed
*/
private String metadataKey;
/**
* Feed url.
*/
private URL feedUrl;
// rest is omitted
}
正如我們所看到的,我們在所有屬性上都使用了 feed.supplier
的前綴。
接下來,讓我們建立一個基於 Spring 的配置類別,在其中提供所有必要的元件。我們將逐步建構它。以下是類別的基本結構。
package org.springframework.cloud.fn.supplier.feed;
...
@Configuration
@EnableConfigurationProperties(FeedSupplierProperties.class)
public class FeedSupplierConfiguration {
}
將這些欄位新增到類別中。
private final ConcurrentMetadataStore metadataStore;
private final Resource resource;
private final FeedSupplierProperties feedSuppplierProperties;
關於這些欄位的快速注意事項。Spring Integration 中的 Feed 介面卡提供了一種功能,可以不讀取我們已經從訂閱內容中讀取的相同條目。我們上面定義的 metadataKey
屬性就是用於此目的。它的運作方式是使用元數據儲存。有各種可用的元數據儲存適用於熱門資料庫。包含以下依賴項以用於記憶體中的簡單元數據儲存。
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>metadata-store-common</artifactId>
<version>${project.version}</version>
</dependency>
請注意,此需求是此供應器特有的,並非所有供應器都需要它。
如果沒有基於 HTTP(或 HTTPS)的 URL 可用(我們可以透過配置屬性設定),則使用者可以提供 Resource
Bean 來讀取訂閱內容。
讓我們新增一個建構子來使用這些欄位。
FeedSupplierConfiguration(FeedSupplierProperties feedSupplierProperties,
ConcurrentMetadataStore metadataStore,
@Nullable Resource resource) {
this.feedSuppplierProperties = feedSupplierProperties;
this.metadataStore = metadataStore;
this.resource = resource;
}
Resource
是可為 null 的,因為大多數情況下,我們可以簡單地將 URL 字串作為配置屬性傳遞,而無需提供 Resource
Bean。
Spring Integration Feed 介面卡提供 FeedEntryMessageSource,它是一個 MessageSource
實作。我們將在我們的供應器中使用此訊息來源。讓我們將其設定為 Spring Bean。下面的程式碼非常容易理解。
@Bean
public FeedEntryMessageSource feedEntryMessageSource() {
final FeedEntryMessageSource feedEntryMessageSource = this.resource == null ? new FeedEntryMessageSource(this.feedSuppplierProperties.getFeedUrl(),
this.feedSuppplierProperties.getMetadataKey()) :
...
return feedEntryMessageSource;
}
現在我們已經準備好了 MessageSource Bean,編寫一個簡單的供應器並透過調用供應器的 get
方法以程式方式調用它就相對容易了。這是它。
@Bean
public Supplier<Message<SyndEntry>> feedSupplier() {
return () -> feedEntryMessageSource().receive();
}
我們可以將此供應器 Bean 注入到我們的應用程式中,並以程式方式調用 get
方法。當此 Supplier
在 Spring Cloud Stream 應用程式中使用時(我們稍後會看到),它將使用 Spring Cloud Stream 提供的 預設輪詢器,預設情況下該輪詢器將每秒觸發供應器。此排程可以在輪詢器中更改。
非反應式輪詢解決方案看起來還不錯,但我們可能會問,如果我不想每隔一段時間就明確地輪詢,而是希望以串流方式在訊息來源中可用時立即取得資料,那該怎麼辦?嗯,我們有一個解決方案 - 開發一個反應式供應器,該供應器會在收到的資料可用時立即傳遞資料。讓我們看看詳細資訊。
在這裡,Spring Integration 再次提供了一些抽象化,我們可以像下面這樣使用這些抽象化將我們的 FeedEntryMessageSource
轉換為反應式發佈者。
@Bean
public Supplier<Flux<Message<SyndEntry>>> feedSupplier() {
return () -> IntegrationReactiveUtils.messageSourceToFlux(feedEntryMessageSource());
}
您可能會注意到,此供應器傳回的是 Flux<Message<SyndEntry>>
,而不是 Message<SyndEntry>
,就像最初的非反應式供應器中顯示的那樣,在其中我們依賴於程式方式調用供應器或其他輪詢機制。
好的,很高興我們有一個來自 Spring Integration 的 MessageSource
,並且我們可以將該實用方法用於將其轉換為 Flux
。如果沒有這樣的 MessageSource
,並且我們必須手動製作我們想要為其編寫反應式樣式供應器的系統的基本資料檢索,該怎麼辦?對於這些情況,我們可以使用 Project Reactor 提供的各種設施,然後以程式方式將資料饋送到它們。重點是,當我們編寫反應式串流供應器時,我們必須將資料作為 Flux
傳回。
讓我們為此反應式供應器新增一個單元測試。我們可以將 RFC 4287 - Atom Syndication Format 中描述的 Atom 訂閱內容範例用作我們的測試資料。將其包含在 src/test/resources
中。
這是測試類別。
@SpringBootTest(properties = {"feed.supplier.feedUrl=classpath:atom.xml",
"feed.supplier.metadataKey=feedTest" })
@DirtiesContext
public class FeedSupplierTests {
@Autowired
Supplier<Flux<Message<SyndEntry>>> feedSupplier;
@Test
public void testFromSampleRssFile() {
final Flux<Message<SyndEntry>> messageFlux = feedSupplier.get();
StepVerifier.create(messageFlux)
.assertNext((message) -> {
assertThat(message.getPayload().getTitle().trim()).isEqualTo("Atom draft-07 snapshot");
assertThat(message.getPayload().getContents().size()).isEqualTo(1);
assertThat(message.getPayload().getContents().get(0).getValue().contains("The Atom draft is finished.")).isTrue();
})
.thenCancel()
.verify();
}
@SpringBootApplication
static class FeedSupplierTestApplication {
}
}
函數專案在 Maven BOM 中聚合了所有可用的函數。將 feed-supplier
新增到此 BOM。如果您要基於此函數產生 Spring Cloud Stream 應用程式,則主要需要此操作。
在此過程的這一點上,我們可以向儲存庫提交提取請求,但如果我們想要從供應器產生基於 Spring Cloud Stream 綁定器的應用程式,請繼續閱讀。產生後,這些應用程式可以獨立執行,也可以作為 Spring Cloud Data Flow 中的資料編排管道的一部分執行。
繼續並在 applications/source
下建立一個名為 feed-source
的新模組。正如我們在之前的部落格中提到的,java.util.function.Supplier
被對應為 Spring Cloud Stream 來源。
我們不需要在我們的 Feed 供應器之上新增任何自訂程式碼,因為它可以按原樣使用。但是,既然我們正在討論 Spring Cloud Stream 應用程式,我們就需要使用 測試綁定器 與供應器函數,以查看供應器如何與 Spring Cloud Stream 協同工作。
我們可以將 現有的來源 之一用作範本,以引導我們完成整個過程。我們甚至可以複製其中一個並逐步進行更改。
所有應用程式都使用父 POM stream-applications-core,它引入了所有必要的測試依賴項,例如上面提到的測試綁定器。它還為應用程式產生器外掛程式提供了基礎架構,該外掛程式負責產生基於綁定器的應用程式。
我們想要強調的一點是,除非應用程式模組包含自訂程式碼,否則此模組僅成為應用程式產生器,用於產生基於綁定器的應用程式。換句話說,我們不會向其新增帶有 @SpringBootApplication
的類別,而是為我們產生該類別。
新增以下依賴項以使用測試綁定器進行測試
<dependencies>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>feed-supplier</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
現在我們可以新增一個測試來驗證 feed-supplier
是否與 Spring Cloud Stream 中的測試綁定器協同工作。基本上,我們需要確保供應器透過測試綁定器產生資料,並將其傳遞到測試綁定器上的目的地。
以下是測試背後的總體思路
public class FeedSourceTests {
@Test
public void testFileSource() throws Exception {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(FeedSourceTestApplication.class))
.web(WebApplicationType.NONE)
.run("--spring.cloud.function.definition=feedSupplier", "--feed.supplier.feedUrl=classpath:atom.xml", "--feed.supplier.metadataKey=feedTest")) {
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> sourceMessage = target.receive(10000);
Object title = JsonPath.parse(new String(sourceMessage.getPayload())).read("$.title");
assertThat(title).isEqualTo("Atom draft-07 snapshot");
}
}
@SpringBootApplication
@Import(FeedSupplierConfiguration.class)
public static class FeedSourceTestApplication {
}
}
該測試與我們為供應器新增的單元測試非常相似,但有一個很大的不同。在供應器中,我們直接調用它並驗證產生的資料。在這裡,我們沒有直接調用供應器,而是 Spring Cloud Stream 中的綁定機制為我們自動執行此操作。我們正在從出站目的地接收資料,然後驗證該資料。
一旦測試通過,就該產生應用程式了。
預設情況下,外掛程式會為 Spring Cloud Stream 中的 Kafka 和 Rabbit 綁定器產生應用程式。這在 stream-applications-core
中的父 POM 中配置。如果我們需要自訂不同綁定器的產生,我們需要在那裡進行這些更改。以下是應用程式產生器外掛程式的配置。
<plugin>
<groupId>org.springframework.cloud.stream.app.plugin</groupId>
<artifactId>spring-cloud-stream-app-maven-plugin</artifactId>
<configuration>
<generatedApp>
<name>feed</name>
<type>source</type>
<version>${project.version}</version>
<configClass>org.springframework.cloud.fn.supplier.feed.FeedSupplierConfiguration.class</configClass>
</generatedApp>
<dependencies>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>feed-supplier</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>stream-applications-composite-function-support</artifactId>
<version>${stream-apps-core.version}</version>
</dependency>
</dependencies>
</configuration>
</plugin>
讓我們快速瀏覽一下這裡的一些詳細資訊。我們正在請求外掛程式建立一個名為 feed-source
的應用程式,並希望它使用我們上面開發的 Supplier
作為主要配置類別。在外掛程式的 dependencies 區段中,我們還需要新增應用程式所需的任何依賴項,在本例中為 feed-supplier
。我們需要在所有產生的來源應用程式中新增我們所有的處理器函數。這是因為我們可以將來源與其他處理器組合,而無需它們像我們在之前的部落格中看到的那樣作為個別的微服務執行。有關使用處理器進行函數組合的更多詳細資訊,請參閱此處。這就是為什麼我們在外掛程式的 dependencies 區段中新增了依賴項 stream-applications-composite-function-support
。
建置應用程式模組,我們將在 apps
資料夾中看到基於綁定器的應用程式。它們將被命名為 feed-source-kafka
和 feed-source-rabbit
。我們可以轉到這些應用程式中的任何一個並建置它,然後將其用作獨立應用程式或作為 Spring Cloud Data Flow 上的管道的一部分使用。
在這篇部落格文章中,我們看到了開發、測試和貢獻供應器/Spring Cloud Stream 應用程式組合的整個過程。請遵循此處列出的步驟來編寫您自己的供應器和來源。如果您已完成,請考慮將它們貢獻回儲存庫。
這篇部落格是系列文章的第三篇,將涵蓋許多相關主題。在未來幾週內,敬請期待更多深入探討和重點主題。在本系列的下一篇部落格中,與我們在這篇文章中關於編寫新的供應器和來源所做的工作類似,我們將編寫一個消費者函數,然後從中產生一個 Sink 應用程式。