搶先一步
VMware 提供培訓和認證,以加速您的進展。
瞭解更多這是「測試 Stream 應用程式」的第二部分。在第一部分中,我們實作並測試了範例 couchbase-sink 應用程式所需的核心功能。函數層級的測試涵蓋了預期的成功和錯誤情境,並依賴 Testcontainers 來佈建 Couchbase 叢集。這篇文章假設您已閱讀第一部分,並從上次結束的地方繼續。
在第一部分中,我們驗證了為將資料 upserting 到 Couchbase 而編寫的函數如預期般運作。我們現在可以使用該函數(以 java.util.Consumer
的形式公開)來實作一個 sink,用於使用 Spring Cloud Stream 建置的資料管線中。與大多數預先封裝的 stream 應用程式一樣,我們只需將函數配置嵌入到 Spring Boot 應用程式中。與為 Kafka 和 Rabbit 生成相同應用程式的預先封裝應用程式不同,我們將自行開發一個使用 Kafka binder 的應用程式。
以下是主要應用程式類別
@SpringBootApplication
@Import(CouchbaseConsumerConfiguration.class)
public class CouchbaseSinkApplication {
public static void main(String... args) {
new SpringApplication(CouchbaseSinkApplication.class).run(args);
}
}
我們還需要新增一些依賴項:函數、Spring Cloud Stream 和 Kafka binder。
<dependency>
<groupId>io.spring.example</groupId>
<artifactId>couchbase-consumer</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
而且,由於我們是自行開發,因此我們可以在 application.properties
中設定一些必要的屬性。由於 couchbase-consumer
包含 2 個候選函數,因此我們需要告訴 Spring Cloud Stream 使用 Consumer
wrapper。此外,我們將預設消費者輸入綁定名稱 couchbaseConsumer-in-0
別名為 input
,以便 sink 可以與 Spring Cloud Data Flow 搭配使用。
spring.cloud.function.definition=couchbaseConsumer
spring.cloud.stream.function.bindings.couchbaseConsumer-in-0=input
就這樣!至少我們認為是這樣。我們如何確定?不出所料,我們需要的測試類型與函數層級測試類似。但我們並不需要真正執行每個測試案例,因為我們已經知道函數在具有各種屬性設定的 boot 應用程式中的行為方式。但我們實際上還沒有透過 Spring Cloud Stream 叫用該函數。而且,它並不需要花費太多,因為我們可以重複使用我們為函數編寫的大部分測試程式碼。因此,我們只需要一個「冒煙測試」來執行 happy path,以確保我們沒有遺漏某些必要的依賴項,或者我們的配置屬性中存在錯字,或者現在沒有任何陷阱,或者當我們在未來升級某些依賴項時。在這裡,我們配置了一個 Couchbase TestContainer,就像我們測試函數一樣。但我們不會直接叫用該函數,而是讓 Spring Cloud Stream 在我們將訊息傳送到為 sink 配置的輸入目的地時執行它。對於此測試,我們使用 TestChannelBinder
,這是一個由以下依賴項提供的記憶體內 binder
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<classifier>test-binder</classifier>
<scope>test</scope>
</dependency>
我們使用 TestChannelBinderConfiguration.getCompleteConfiguration(CouchbaseSinkApplication.class)
將 TestChannelBinder 新增到我們測試的應用程式上下文中。這為我們提供了一個 InputDestination
bean,用於將訊息傳送到 sink。與函數測試中一樣,我們使用 Cluster
物件來驗證資料是否存在於 Couchbase 中。由於 upsert 操作是非同步的,因此我們需要輪詢資料儲存區一段時間,直到資料存在為止。awaitility 程式庫非常適合測試非同步系統。在這種情況下,我們將給它 10 秒鐘,然後我們假設操作已失敗。
@Testcontainers
public class CouchbaseSinkApplicationTests {
@Container
static CouchbaseContainer container =
new CouchbaseContainer("couchbase/server:6.6.0")
.withBucket(new BucketDefinition("test"));
static Map<String, Object> connectProperties = new HashMap<>();
@BeforeAll
static void initialize() {
connectProperties.put("spring.couchbase.connection-string", container.getConnectionString());
connectProperties.put("spring.couchbase.username", container.getUsername());
connectProperties.put("spring.couchbase.password", container.getPassword());
}
@Test
void test() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(CouchbaseSinkApplication.class))
.web(WebApplicationType.NONE)
.properties(connectProperties)
.run("--couchbase.consumer.bucketExpression='test'",
"--couchbase.consumer.keyExpression=payload.email")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
Cluster cluster = context.getBean(Cluster.class);
inputDestination.send(new GenericMessage<>(
new User("Bart Simpson", "[email protected]")));
await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
User user = cluster.bucket("test")
.defaultCollection().get("[email protected]")
.contentAs(User.class);
assertThat(user).isNotNull();
assertThat(user.getName()).isEqualTo("Bart Simpson");
});
}
}
}
此時,我們在應用程式和函數測試之間具有良好的測試覆蓋率。但我們尚未驗證我們想要建置和部署的應用程式二進位檔在真正的整合環境中是否運作。由於 sink 應用程式使用 Kafka binder,因此整合測試環境需要 Kafka broker、Couchbase 叢集和我們部署的應用程式。我們可以部署並直接執行 Spring Boot 可執行 jar 檔。現在更常見的是容器映像。
一般而言,假設以容器形式建置的 sink 可以運作並不是太冒險,但我們至少要確保我們知道如何配置應用程式以使用外部 Kafka broker 和 Couchbase 叢集,以及我們是否正確建置了映像。
對於預先建置的 Spring Cloud Stream 應用程式,我們有更多理由測試建置的成品。核心應用程式不提供任何額外的程式碼。相反,我們使用 spring-cloud-dataflow-apps-generator-plugin 自動生成可以與 Kafka 或 RabbitMQ 一起運行的相同應用程式。外掛程式需要 Maven 配置,我們為每個應用程式手動新增配置。僅僅因為我們的函數與 TestChannelBinder 搭配運作,我們就不能確定建置的成品是否運作,除非我們運行它。錯誤配置 apps generator 外掛程式、外掛程式本身的變更、基礎映像或任何依賴項都可能導致某些問題。Testcontainers 和 Junit 5 為我們提供了一種相對簡單的方法,可以使用 Kafka 和 RabbitMQ 整合測試預先建置的應用程式。為了幫助我們編寫整合測試,我們在 stream-applications-test-suport 中提供額外支援。社群可以透過新增依賴項來使用此程式庫
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>stream-applications-test-support</artifactId>
<scope>test</scope>
</dependency>
範例包含一個整合測試,用於測試建置的映像,在本例中是使用 Spring Boot Maven 外掛程式 建置的。與應用程式測試一樣,我們只需插入 Kafka、Couchbase 和我們的映像,開啟電源,並確保我們沒有看到或聞到任何煙霧。
完整的整合測試如下
@KafkaStreamAppTest
@Tag("integration")
public class CouchbaseSinkIntegrationTests {
static StreamAppContainer sink =
new KafkaStreamAppContainer("couchbase-sink:0.0.1-SNAPSHOT");
@Container
static CouchbaseContainer container =
new CouchbaseContainer("couchbase/server:6.6.0")
.withNetwork(KafkaConfig.kafka.getNetwork())
.withNetworkAliases("couchbase-server")
.withBucket(new BucketDefinition("test"));
static Cluster cluster;
@Autowired
TestTopicSender testTopicSender;
@BeforeAll
static void initialize() {
await().until(() -> container.isRunning());
String connectionString = "couchbase://couchbase-server";
sink.waitingFor(Wait.forLogMessage(".*Started CouchbaseSink.*", 1))
.withLogConsumer(appLog("couchbase-sink"))
.withCommand(
"--spring.couchbase.connection-string=couchbase://couchbase-server",
"--spring.couchbase.username=" + container.getUsername(),
"--spring.couchbase.password=" + container.getPassword(),
"--couchbase.consumer.bucket-expression='test'",
"--couchbase.consumer.key-expression=payload.email")
.start();
cluster = Cluster.connect(container.getConnectionString(),
ClusterOptions.clusterOptions(container.getUsername(), container.getPassword()));
}
@AfterAll
static void stop() {
sink.stop();
}
@Test
void test() throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
testTopicSender.send(sink.getInputDestination(),
objectMapper.writeValueAsString(
new User("Bart Simpson", "[email protected]")));
await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
ExistsResult result = cluster.bucket("test")
.defaultCollection().exists("[email protected]");
assertThat(result.exists()).isTrue();
});
User user = objectMapper.readValue(
cluster.bucket("test").defaultCollection().get("[email protected]")
.contentAs(String.class), User.class);
assertThat(user.getName()).isEqualTo("Bart Simpson");
}
}
為了拆解這個,讓我們從 @KafkaStreamAppTest
類別註解開始。這會啟動 Kafka 測試容器,並使用 Spring for Apache Kafka 配置 Kafka 組件,我們可以將其用於與 Kafka 生產和消費訊息。Kafka 容器在靜態初始化器中啟動,這使其成為真正的單例,允許在 JVM 中運行的每個測試都使用它。除了 Spring 配置外,註解還包括作為元註解的 @TestContainers
。對於此測試,我們不讓 Testcontainers 管理 StreamAppContainer
的生命週期,因為我們希望在知道 Couchbase 叢集正在運行後再啟動它。Couchbase 容器有一些額外的配置。為了方便起見,它與 StreamAppContainer
共用一個虛擬網路(自動配置為使用與 Kafka 容器相同的網路)。這允許 Stream App Container 使用我們選擇的別名 couchbase-server
連接到 Couchbase 伺服器(請記住,容器內的 localhost
指的是其自身的 IP 位址)。
@Container
static CouchbaseContainer container = new CouchbaseContainer("couchbase/server:6.6.0")
.withNetwork(KafkaConfig.kafka.getNetwork())
.withNetworkAliases("couchbase-server")
.withBucket(new BucketDefinition("test"));
StreamAppContainer 是一個 GenericContainer,具有連接到 Kafka 並使用 Kafka binder 所需的配置。Spring 配置還在已知主題上設定監聽器,以消費容器的任何輸出。在本例中未使用它,因為我們只有 sink 的輸入。輸入目的地是隨機生成的,並且透過 getInputDestination()
存取。
static StreamAppContainer sink = new KafkaStreamAppContainer("couchbase-sink:0.0.1-SNAPSHOT");
...
@BeforeAll
static void initialize() {
await().until(() -> container.isRunning());
String connectionString = "couchbase://couchbase-server";
sink.waitingFor(Wait.forLogMessage(".*Started CouchbaseSink.*", 1))
.withLogConsumer(appLog("couchbase-sink"))
.withCommand(
"--spring.couchbase.connection-string=couchbase://couchbase-server",
"--spring.couchbase.username=" + container.getUsername(),
"--spring.couchbase.password=" + container.getPassword(),
"--couchbase.consumer.bucket-expression='test'",
"--couchbase.consumer.key-expression=payload.email")
.start();
一旦 Couchbase 容器正在運行,我們將啟動 sink。我們等待標準 Spring Boot 啟動訊息,以確認 sink 已啟動。我們還新增了 LogConsumer 以輸出所有日誌訊息,以防發生錯誤。請注意,連線字串僅使用 Couchbase 容器的網路別名。這是可能的,因為 sink 和 Couchbase 使用相同的虛擬網路。在這裡,我們在命令列上傳遞所有屬性,但我們也可以透過 withEnvironment()
將它們設定為環境變數。由於我們控制 sink 生命週期,因此我們需要在所有測試完成後停止它。
該測試使用自動裝配的 TestTopicSender
。這是一個與中介軟體無關的介面,在本例中由 KafkaTemplate 支援。此介面適用於為 Kafka 和 Rabbit 運行相同的測試案例。在這裡,我們也可以自動裝配 KafkaTemplate
。在撰寫本文時,僅為 Kafka 範本配置了 String serdes,因此我們使用 ObjectMapper
來處理字串。
@Test
void test() throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
testTopicSender.send(sink.getInputDestination(),
objectMapper.writeValueAsString(
new User("Bart Simpson", "[email protected]")));
await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
ExistsResult result = cluster.bucket("test")
.defaultCollection().exists("[email protected]");
assertThat(result.exists()).isTrue();
});
User user = objectMapper.readValue(
cluster.bucket("test").defaultCollection().get("[email protected]")
.contentAs(String.class), User.class);
assertThat(user.getName()).isEqualTo("Bart Simpson");
}
由於此測試需要 sink 映像,因此我們使用 Junit 5 @Tag
註解將其標記為整合測試。我們還配置了 Maven 以將其從正常建置中排除,並且僅在設定 integration 設定檔時才建置映像並運行它。完整的原始程式碼在此,需要 Java 8+ 和 Docker。
在這篇文章中,我們探討了測試與外部服務(例如 Couchbase)整合的 Spring Cloud Stream 應用程式的策略。大部分測試(在第一部分中描述)都是在函數層級完成的。應用程式和整合測試實際上是冒煙測試,用於驗證我們是否已正確建置、配置和整合所有內容。我們還展示了如何使用 TestContainers 測試 Stream 應用程式。
感謝您的光臨!我們希望您覺得此內容對您有幫助。在本系列結束之前,我們還有幾篇文章。