測試 Spring Cloud Stream 應用程式 - Part 1

工程 | David Turanski | 2020年12月15日 | ...

這篇文章是部落格系列文章的一部分,探討基於 Java Function 的全新設計 Spring Cloud Stream 應用程式。本集分為兩部分,探討用於測試實作串流應用程式的 Function 的策略。我們將特別關注與外部資源整合的 Function,這帶來了額外的測試挑戰。大多數的預先封裝的來源和 Sink 應用程式都是這種情況。為了說明這一點,我們將逐步介紹一個範例 couchbase-sink 應用程式。在第 1 部分中,我們將重點關注 Sink 所基於的核心 Function。在第 2 部分中,我們將著眼於為應用程式編寫測試。

以下是本部落格系列中所有先前的文章。

測試考量

Function 和應用程式

對於基於 Function 的串流應用程式,核心功能會公開為 Function。預先建置的 Spring Cloud Stream 應用程式的核心 Function 會封裝為個別元件,以便任何應用程式都可以使用它們,而與 Spring Cloud Stream 無關。Spring Cloud Stream 原生支援 Java Function,並將繫結至任何實作核心 java.util.function 類型之一的 Bean:ConsumerSupplierFunction。Function 作為一個獨立的元件來看,不需要依賴 Spring 或任何其他東西。如果您在包含 Spring Cloud Stream Binder 作為依賴項的任何應用程式中註冊任何 Function 作為 Bean,Spring Cloud Stream 會將其繫結至已設定的訊息目的地。

在資料管線中,資料串流源自 Source 並流入 Sink,中間有零個或多個處理步驟。實際上,Source 充當來自某些外部資源 (例如資料儲存區、任何支援標準協定的服務或訊息代理程式) 的資料 Supplier。Sink 充當某些其他外部資源的資料 Consumer。由於 Spring 為大多數常用的外部資源提供一流的支援,因此大多數預先封裝的 Source 和 Sink 都依賴 Spring Integration、Spring Data 和 Spring Boot 的某種組合,這應該不足為奇。此外,它們旨在透過 @ConfigurationProperties 針對許多環境、網域和使用案例進行設定。雖然這些 Function 本身不是 Spring Boot 應用程式,但它們必須匯入到 Spring Boot 應用程式才能執行。

由於所有核心功能都由 Function 實作,因此我們希望將大部分測試工作集中在這個層級。為了確保我們的 Function 在所有預期的成功和錯誤條件下都能正確運作,我們需要編寫測試來涵蓋這些情境。這些測試需要建立自動設定的應用程式內容,並佈建或 Mock 所需的外部資源。

如果 Function 可透過 @ConfigurationProperties 設定,那麼我們可以將每個屬性組合視為不同的測試案例。有些屬性是必要的,有些是選用的。由於使用 Function 需要終端使用者提供這些屬性,因此預期的情境包括有效和無效的設定,例如缺少必要的屬性、無效的值或無效的組合 (互斥的屬性)。

單元測試與整合測試

這裡沒有廣泛接受的定義可以幫助我們。特別是對於 Source 和 Sink,其中核心功能整合,很難知道在哪裡劃分單元測試和整合測試之間的界線。一方面,Java Function 是一個單元,因為它是一個單一介面。但是,如果它的唯一目的是與遠端系統整合,則很難 (如果不是不可能) 單獨進行測試。但是,我認為我們可以就一些一般特徵達成共識

單元測試

  • 在任何開發人員或 CI 環境中,作為建置的一部分自動執行,無需任何外部設定

  • 速度合理快

  • 由開發人員編寫並經常執行

整合測試

  • 在整合環境中自動執行

  • 需要部署受測元件以及外部依賴項

  • 可能速度較慢

  • 執行頻率較低

根據單元測試的定義,第 1 部分是關於 Function 的單元測試。

測試容器

Testcontainers 是一個新興且熱門的 Java 程式庫,可讓您以程式設計方式啟動和丟棄任何可以在 Docker 容器中執行的外部資源。它包含數十個用於常用資源的現成模組。您也可以使用該程式庫以程式設計方式從 Dockerfile 或 docker-compose yaml 建立自訂容器。雖然主要用於整合測試,但在 Mock 需要付出更多努力時,它對於編寫單元測試非常有用。當然,我們必須犧牲一些速度,並放寬「沒有外部依賴項」規則,以允許在主機上安裝並執行 Docker Daemon。由於如今許多開發和 CI 環境已經需要使用和建置映像檔,因此這是一個合理的假設。

範例

Couchbase Consumer Function

為了說明,我們將編寫一個 Couchbase Consumer Function,以使用 upsert 作業將一些資料新增至 Couchbase 鍵值儲存區。

為了提高效率,我們將使用 Couchbase Java Client 的反應式 API 實作 Function。此 API 會傳回 MutationResult 的 Publisher,因此我們的核心介面是 Function<Flux<Message<?>>, Flux<MutationResult>>。此 Function 將使用 Spring 進行設定,並且可以嵌入到任何 Spring Boot 應用程式中。為了支援 couchbase-sink,我們將 Function 包裝在 Consumer<Flux<Message<?>>> 中。

upsert 作業會在 Bucket 中插入或更新資料,Bucket 是 Couchbase 的主要資料儲存區抽象概念。在我們的案例中,是 ReactiveBucket。Bucket 由名稱指定,並且必須事先存在於 Couchbase 叢集中。從 v6.5 開始,Couchbase 支援集合 (Collections)。因此,Bucket 可以分割成許多集合,但這是一個選用功能,必須在叢集中啟用。upsert 方法的目標是具名的集合,或預設集合 (defaultCollection)

我們在 Spring Message 中將索引鍵和值傳遞給 Function,Spring Message 包含 Payload 和標頭。Payload 可以是任何物件,而標頭本質上是一個 Map。為了使此 Function 具有通用性,我們可以使用 SpEL 運算式 來指定索引鍵。索引鍵運算式會針對 Message 進行評估,並且可以參照 Payload 中的欄位或方法,或標頭。值是 Payload。Function 也需要使用者指定 Bucket 和集合名稱。為了最大化彈性,讓我們加倍使用 SpEL,並將所有內容都設為運算式。現在,如果我們願意,Function 可以從執行階段的訊息中擷取其所有輸入值,以在任何 Bucket 中的任何集合中執行 Upsert 作業。在最簡單的情況下,Bucket 和集合可以靜態定義。

因此,Function 需要一些設定屬性

@ConfigurationProperties("couchbase.consumer")
@Validated
public class CouchbaseConsumerProperties {
    private static final String DEFAULT_VALUE_EXPRESSION = "payload";
    private final SpelExpressionParser parser = new SpelExpressionParser();

   /**
    * A SpEL expression to specify the bucket.
    */
    private Expression bucketExpression;

   /**
      * A SpEL expression to specify the key.
     */
    private Expression keyExpression;

  /**
    * A SpEL expression to specify the collection.
    */
    private Expression collectionExpression;

  /**
    * A SpEL expression to specify the value (default is payload).
    */
    private Expression valueExpression =
                parser.parseExpression(DEFAULT_VALUE_EXPRESSION);
    ...

提示

若要靜態設定其中一些值,請使用常值運算式,並以單引號括住值,例如 couchbase.consumer.bucketExpression='mybucket'。通常,您會從訊息內容中擷取索引鍵和值。

我們使用 Spring 設定反應式 Function 和對應的 Consumer

@Configuration
@EnableConfigurationProperties(CouchbaseConsumerProperties.class)
public class CouchbaseConsumerConfiguration {

    private static Logger logger =
            LoggerFactory.getLogger(CouchbaseConsumerConfiguration.class);

    @Bean
    public Consumer<Flux<Message<?>>> couchbaseConsumer(Function<Flux<Message<?>>,
                Flux<MutationResult>> couchbaseConsumerFunction) {
        return message -> couchbaseConsumerFunction.apply(message)
               .subscribe(mutationResult -> logger.debug("Processed " + message));
    }

    @Bean
    public Function<Flux<Message<?>>, Flux<MutationResult>> couchbaseConsumerFunction(
          Cluster cluster, CouchbaseConsumerProperties consumerProperties) {
        return flux -> flux.flatMap(message -> {
            logger.debug("Processing message " + message);
             String bucketName = bucket(message,
                          consumerProperties.getBucketExpression());
            String key = key(message, consumerProperties.getKeyExpression());
            ReactiveBucket bucket = cluster.bucket(bucketName).reactive();
             ReactiveCollection collection = collection(message,
                            consumerProperties.getCollectionExpression())
				  .map(name -> bucket.collection(name))
                                  .orElse(bucket.defaultCollection());
            return collection.upsert(key,
                              value(message, consumerProperties.getValueExpression()));
        });
    }

    private String bucket(Message<?> message, Expression expression) {
        return expression.getValue(message, String.class);
    }

    private String key(Message<?> message, Expression expression) {
        return expression.getValue(message, String.class);
    }

    private Object value(Message<?> message, Expression expression) {
        return expression.getValue(message);
    }

    private Optional<String> collection(Message<?> message,
                                             @Nullable Expression expression) {
        return expression == null ? Optional.empty() :
                Optional.of(expression.getValue(message, String.class));
    }
}

這兩個類別是實作 Function 所需的一切。所需的依賴項為

<dependency>
    <groupId>com.couchbase.client</groupId>
    <artifactId>java-client</artifactId>
</dependency>
<!-- Enable configuration properties metadata to be added to the jar -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
</dependency>
<!-- This provides a Spring Converter to convert Strings to Expression, required for CouchbaseConsumerProperties as implemented -->
<dependency>
    <groupId>org.springframework.cloud.fn</groupId>
    <artifactId>config-common</artifactId>
</dependency>

如先前所述,這不是 Spring Boot 應用程式,而是一個必須嵌入到 Spring Boot 應用程式才能執行的元件。Spring Boot 會繫結 @ConfigurationPropeties,並提供 CouchbaseAutoConfiguration

注意

此範例未使用 spring-data-couchbase,因為它是用於使用 Spring Data Repositories 和自動對應特定網域物件。由於我們的 Function 旨在處理任何 Payload 類型,因此我們使用 Boot 來自動設定叢集以及 Couchbase Java SDK。

那麼我們是如何最終得到一個實際運作的 Function 的呢?上面的範例程式碼是經過測試驅動開發的結果,經過多次迭代的完善。由於 Function 依賴於 Couchbase SDK Cluster 物件 (它完成所有工作),因此我們需要先建立一個 Cluster 執行個體才能執行任何操作。Cluster 需要連線到 Couchbase 伺服器。如果我們碰巧在我們的網路上已經執行了一個 Couchbase 叢集,並且有一個 Bucket 可以用於測試,那麼我們最初可能會使用它。但是,即使我們假設可以從我們的開發和 CI 環境存取 Couchbase,如果我們由於某些原因而無法連線到 Couchbase (叢集已關閉、憑證已過期、權限已變更或某些其他原因),會發生什麼情況?我們是否希望讓它中斷我們的 CI/CD 管線或阻止我們的進度?

幸運的是,我們可以使用 Testcontainers Couchbase 模組 來啟動我們自己的 Couchbase 環境。

注意

完整揭露:我也嘗試過 CouchbaseMock,但它似乎與目前的 couchbase Java Client 不相容。

Junit 5 的必要測試程式庫為

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>couchbase</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <scope>test</scope>
</dependency>

為了在我們的 Junit 5 測試類別中使用 Testcontainers,我們從設定了名為 test 的 Bucket 的 Couchbase 容器開始。

@Testcontainers
public class CouchbaseConsumerTests {

	@Container
	static CouchbaseContainer container = new CouchbaseContainer("couchbase/server:6.6.0")
			.withBucket(new BucketDefinition("test"));

@Testcontainers 註解啟用使用 @Container 註解的欄位的生命週期管理。在這裡,我們將 CouchbaseContainer 宣告為 static,因此 TestContainers 將在測試執行之前啟動容器一次,並在之後移除它。這是一件好事,因為啟動容器需要幾秒鐘的時間。

注意

另請查看 Playtika Testcontainers for Spring Boot。這是一個有趣的專案,它使用 Spring Boot 來抽象化「嵌入式」服務,以自動設定 Testcontainer。這需要您偏好的 org.springframework.cloud:spring-cloud-starter 版本。如果您使用的 Spring Cloud 版本與 Spring Boot 2.4+ 相容,則需要設定 "spring.cloud.bootstrap.enabled=true"。範例未使用此程式庫,因為 Spring Bean 無法宣告為 static,因此我們必須為每個測試啟動一個新的容器執行個體。無論如何,Testcontainers 真的很容易使用。

如上所述,不同的屬性設定代表不同的測試案例。Spring Boot 在應用程式啟動時從其屬性來源繫結屬性。因此,我們需要為我們想要測試的每個屬性組合建立一個新的應用程式內容。我們在 stream-applications 儲存庫中看到了幾種不同的策略

  • 建立一個抽象 @SpringBootTest 來設定 @SpringBootApplication 測試內容和共用設定屬性。為每個測試案例建立一個子類別,並使用 @TestPropertySource 註解,如 此處 所示。

  • 使用 ApplicationContextRunner 為每個測試案例建立一個新的 ApplicationContext,如 此處 所示。

  • 使用 SpringApplicationBuilder 為每個測試案例建立一個新的 ApplicationContext,如 此處 所示。

您使用哪一個很大程度上取決於個人選擇。範例 Function 的測試使用 ApplicationContextRunner,預先設定了由測試容器提供的必要 Boot Couchbase 連線屬性。Testcontainers 的一個優點是,它會如預期般公開標準埠,並將每個公開的埠對應到隨機可用的埠。Couchbase TestContainer 包含特定於 Couchbase 的 getConnectionString()。通常,您可以根據需要使用 container.getMappedPort(int originalPort)

提示

使用隨機 TCP 埠對於自動化測試至關重要,因為 1) 您不知道給定環境中可能正在使用哪些埠 2) 建置工具通常會並行執行測試。這通常會由於靜態定義時埠不可用而導致錯誤。

@Testcontainers
public class CouchbaseConsumerTests {

    @Container
    static CouchbaseContainer container =
            new CouchbaseContainer("couchbase/server:6.6.0")
                   .withBucket(new BucketDefinition("test"));

	private ApplicationContextRunner applicationContextRunner;

    @BeforeEach
    void setup() {
        applicationContextRunner = new ApplicationContextRunner()
            .withUserConfiguration(TestConfig.class)
            .withPropertyValues(
                 "spring.couchbase.connection-string=" +
                                                container.getConnectionString(),
                 "spring.couchbase.username=" + container.getUsername(),
                 "spring.couchbase.password=" + container.getPassword());
    }

我們使用 TestConfig.class 來啟動應用程式內容,我們將其作為內部類別提供

@SpringBootApplication
static class TestConfig {
    @Autowired
    Cluster cluster;

   @PreDestroy
    public void destroy() {
        cluster.disconnect();
    }
}

在許多情況下,這可以是一個使用 @SpringBootApplication 註解的空類別,以觸發屬性繫結以及任何必要的自動設定 (在本例中為 CouchbaseAutoConfiguration)。在這裡,我們與叢集斷線,以防止在內容關閉時出現多餘的堆疊追蹤。

對於這些測試,我們將建立一個簡單的 User 類型,其中包含名稱和電子郵件地址,我們可以將其用於索引鍵

@JsonIgnoreProperties(ignoreUnknown = true)
public class User {
	private String name;

	private String email;

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public String getEmail() {
		return email;
	}

	public void setEmail(String email) {
		this.email = email;
	}

	public User() {
	}

	public User(String name, String email) {
		this.name = name;
		this.email = email;
	}
    ...

現在我們準備好測試我們的 Function。由於 Function 是反應式的,我們將使用 reactor-test 程式庫中的 StepVerifier 來驗證傳回的 Flux 的內容。我們先從最簡單的順利路徑情境開始:upsert 單一使用者,提供最少的必要設定:Bucket 名稱和索引鍵運算式。我們將建構一個具有 User Payload 的 Message。為了將使用者儲存到 test Bucket 的預設集合中,使用使用者的電子郵件作為索引鍵,我們只需要將 Bucket 名稱作為常值提供,並將索引鍵運算式設定為 payload.email。這些屬性需要使用在 CouchbaseConsumerProperties 中設定的 couchbase.consumer 字首。至少,這是預期的行為。在我們可以驗證呼叫 Function 後資料是否存在於資料儲存區之前,我們無法確定所有這些都有效。我們直接使用 Couchbase API 來擷取資料,並斷言內容是我們期望的。

@Test
void singleUpsert() {
   applicationContextRunner.withPropertyValues(
           "couchbase.consumer.bucketExpression='test'",
            "couchbase.consumer.keyExpression=payload.email")
      .run(context -> {
           CouchbaseConsumerProperties properties =
                    context.getBean(CouchbaseConsumerProperties.class);
           String bucketName = properties.getBucketExpression().getValue(String.class);
           Cluster cluster = context.getBean(Cluster.class);
           Function<Flux<Message<?>>, Flux<MutationResult>>
                 couchbaseConsumerFunction =
                       context.getBean("couchbaseConsumerFunction", Function.class);
           StepVerifier.create(couchbaseConsumerFunction.apply(
               Flux.just(new GenericMessage<>(new User("David", "[email protected]")))))
            .expectNextMatches(mutationResult ->
                   mutationResult.mutationToken().get().bucketName().equals(bucketName))
            .verifyComplete();

        User saved = cluster.bucket(bucketName).defaultCollection()
                                   .get("[email protected]").contentAs(User.class);
       assertThat(saved.getName()).isEqualTo("David");
  });
}

透過先前顯示的 Function 實作,當我們在 IDE 中執行測試時,我們很高興看到綠色。實際上,我們首先需要像這樣的測試來編寫 Function。這就是為什麼我們對這個簡單的測試進行了大量的思考和努力。我們也想測試套用多個物件,以及為值和 Bucket 設定自訂運算式。我們可能還想檢查我們的屬性類別中的 Java 驗證註解。

@NotNull(message = "'keyExpression' is required")
public Expression getKeyExpression() {
    return keyExpression;
}

我忘記了,註解是放在 Getter 還是 Setter 上?我們真的需要 @Validated 類別註解嗎?讓我們找出答案。如果我們忘記設定 couchbase.consumer.keyExpression,我們應該在堆疊追蹤的某處收到例外訊息 'keyExpression is required'。如果沒有,那麼我們就做錯了什麼。幸運的是,spring-boot-starter-test 為我們提供了測試所需的一切,包括 Assertj (用於斷言的流暢 DSL)、Mockito 和 Junit 5。

@Test
void keyExpressionRequired() {
  assertThatExceptionOfType(RuntimeException.class).isThrownBy(
   () -> applicationContextRunner.withPropertyValues(
      "couchbase.consumer.bucket-expression='test'").run(context -> context.start()))
    .havingRootCause()
    .withMessageContaining("'keyExpression' is required");
}

到我們完成時,我們編寫的程式碼行數將是實作 Function 所需的兩倍以上,並且可能花費兩倍以上的時間。但是,這種努力是值得的,因為它向我們證明了 Function 在常見情境中按預期運作,並防止在重構或新增功能時引入回歸。完整的測試位於此處。我很高興地說,我的 IDE 報告的覆蓋率超過 90%。

結論

第 1 部分的測試主題到此結束。在這篇文章中,我們探討了用於測試與外部資源 (例如 Couchbase) 整合的 Function 的策略。我們也展示了 TestContainers 程式庫對於測試分散式系統的元件有多麼有用,尤其是在使用 Mock、Stub 或嵌入式伺服器不切實際時。第 2 部分 將涵蓋基於 Function 的串流應用程式的單元測試和整合測試。

敬請期待…​

感謝您的光臨!我們希望您覺得這些內容對您有所幫助。在本系列結束之前,我們還有幾篇文章。

取得 Spring 電子報

隨時關注 Spring 電子報

訂閱

搶先一步

VMware 提供培訓和認證,以加速您的進展。

了解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

了解更多

即將舉行的活動

查看 Spring 社群中所有即將舉行的活動。

查看全部