取得領先
VMware 提供培訓和認證,以加速您的進展。
了解更多本部落格系列的其他部分
第 1 部分:Spring Cloud Stream Kafka 應用程式交易簡介
本文是部落格系列的第 2 部分,我們將詳細探討 Spring Cloud Stream 和 Apache Kafka 中的交易。我們在前一部分中看到了交易的一般簡介,觸及了基本概念。在本部落格系列的這一部分中,我們將深入了解一些實作細節及其應用層面。
在本文中,我們主要停留在生產者端,以了解交易如何與 Spring Cloud Stream 和 Apache Kafka 協同運作。
在深入探討生產者啟動的交易之前,我們先從一些基礎知識開始,看看一個簡單的生產者。在 Spring Cloud Stream 中,有幾種方法可以編寫生產者(在訊息傳遞領域中也稱為發布者)。如果您有需要依排程產生資料的使用案例,您可以編寫一個 java.util.function.Supplier
方法,如下所示。
@Bean
public Supplier<Pojo> mySupplier() {
return () -> {
new Pojo();
};
}
當提供上述 Supplier 作為 Spring Bean 時(如程式碼所示),Spring Cloud Stream 會將其視為發布者,並且由於我們在此處處於 Apache Kafka 的上下文中,因此它會將 POJO 記錄發送到 Kafka 主題。
預設情況下,Spring Cloud Stream 每秒調用供應者一次,但您可以透過組態變更該排程。請參閱參考文件以取得更多詳細資訊。
如果您不想輪詢供應者,而是想控制其發布頻率怎麼辦?Spring Cloud Stream 透過 StreamOperations
API 及其開箱即用的實作 StreamBridge
提供了一種便利的方式。以下是一個範例。
@Autowired
StreamBridge streamBridge;
@PostMapping("/send-data")
public void publishData() {
streamBridge.send("mySupplier-out-0", new Pojo());
}
在本例中,應用程式使用 REST 端點來觸發透過 StreamBridge
發布資料。由於框架不會依排程調用該函數,因此任何外部方都可以透過調用 REST 端點來啟動資料發布。
在這些基本生產者中使用交易是否合適?
現在我們已經了解了 Spring Cloud Stream 為發布記錄提供的兩種策略,讓我們回到我們的主要討論主題:交易式發布。假設有一種情境,我們希望在使用這些生產者中的一個或多個時確保資料完整性並獲得交易保證。在這種情況下,問題是我們是否首先需要使用交易才能達成它們。在上面的兩個範例中,您如何確保記錄以交易方式發布?簡短的答案是,您應該避免針對這些類型的發布使用案例使用交易。這些範例中的記錄發布是單次發送情境。使用同步生產者,我們可以達到相同的語意交易保證。預設情況下,生產者是非同步的,當使其在同步模式下運行時,生產者會確保在向客戶端發送回應之前,將記錄寫入領導者和所有副本。您可以透過將 spring.cloud.stream.kafka.bindings.<binding-name>.producer.sync
屬性設定為 true
來啟用同步發布。
總結來說,在設計僅生產者的應用程式時,請謹慎地使用交易。如果您使用 Supplier
或透過 StreamBridge
一次發送一筆記錄,我們不建議使用交易,因為將生產者轉換為同步模式執行將達成相同的結果,而不會產生交易額外負擔。然後,此討論引出了一個有趣的問題。對於僅生產者的應用程式,何時有必要使用交易並獲得好處?如本部落格系列的前一部分中所討論的,這完全取決於應用程式的使用案例。在生產者的情境中,這表示只有當我們需要進行多個相關的發布,或者除了發布之外,我們還需要與外部交易管理器同步時,我們才需要使用交易。本文的下一節涵蓋前一種情境,而本部落格系列的下一篇文章涵蓋後一種情境。
在 Spring Cloud Stream 的 Kafka Binder 中啟用交易的主要驅動因素是一個單一屬性:spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix
。當此屬性具有有效的前綴字串時,Spring Cloud Stream 中的 Kafka Binder 會確保底層 KafkaTemplate
使用交易發布資料。順帶一提,此屬性會向 Spring Cloud Stream 發出信號,使其在使用處理器模式(consume-process-produce 或 read-process-write 模式)時,讓消費者感知交易。
雖然違反直覺,但讓我們回到我們單一 Supplier
或 StreamBridge
範例(如前所述),並引入交易以了解交易組件的主要用途。如前所述,在這些情況下我們不需要使用交易,因為這會增加更多額外負擔。然而,這樣做有助於我們理解事物。
以下是程式碼再次呈現
@SpringBootApplication
@RestController
public class SimpleSpringCloudStreamProducer {
@Bean
public Supplier<Pojo> mySupplier() {
return () -> {
new Pojo();
};
}
@Autowired
StreamBridge streamBridge;
@PostMapping("/send-data")
public void publishData() {
streamBridge.send("mySupplier-out-0", new Pojo());
}
}
現在讓我們提供必要的屬性。
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: my-transactional-producer-
由於我們在應用程式的組態中提供該屬性,因此每次調用此範例中的供應者(透過框架)或有人調用 StreamBridge#send
方法後面的 REST 端點時,底層發布到 Kafka 主題的操作都會變成完全交易式。
當供應者被觸發時,Kafka Binder 會使用 KafkaTemplate
發布資料。當 Binder 偵測到應用程式提供了 transaction-id-prefix
屬性時,每次 KafkaTemplate#send
調用都會透過 KafkaTemplate#executeInTransaction
方法完成。因此,請放心,框架確實以交易方式執行底層發布到 Kafka 主題的操作。從應用程式角度來看,應用程式開發人員為交易目的需要提供的唯一事情是 transaction-id-prefix
屬性。
在開發或偵錯交易式應用程式時,將記錄層級設定為 TRACE
通常是值得的,這樣相關的底層交易類別可以為我們提供有關正在發生的事情的詳細資訊。
例如,如果您在以下套件上將記錄層級設定為 TRACE,您將在記錄中看到相當多的活動。
logging:
level:
org.springframework.transaction: TRACE
org.springframework.kafka.transaction: TRACE
org.springframework.kafka.producer: TRACE
org.springframework.kafka.core: TRACE
每次框架調用供應者方法時,我們可以在追蹤記錄中觀察到以下內容
o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1426370c] beginTransaction()
o.s.kafka.core.KafkaTemplate : Sending: ProducerRecord
o.s.kafka.core.KafkaTemplate : Sent: ProducerRecord(topic=myTopic1, partition=null, headers=RecordHeaders(headers = …
o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1426370c] commitTransaction()
您可以從追蹤記錄中看到,每次以交易方式發布記錄時,它都會形成一個序列:beginTransaction、Sending、Sent 和 commitTransaction。如果您運行應用程式,您將觀察到每秒都會看到這些序列,因為這是 Spring Cloud Stream 調用 Supplier
方法的預設排程頻率。
相同的交易流程也適用於 StreamBridge#send
的情況。當 Spring Cloud Stream 調用 send 方法時,輸出綁定使用的底層 KafkaTemplate
會確保記錄在交易內發布,因為我們提供了 transaction-id-prefix
。
完成基礎知識講解後,讓我們繼續討論適合使用交易的情況。正如我們之前討論過的,需要將多筆記錄作為單一原子單元發布是一個有效的情境,在這種情境下,使用交易變得必要。
讓我們看看以下程式碼範例
public void publish(StreamBridge streamBridge {
for (int i = 0; i < 5; i++) {
streamBridge.send("mySupplier-out-0", "data-" + i);
}
}
如您所見,這是一個為了示範風險而設計的範例。我們發布了多筆記錄,而不是發布一次。發布到多個主題在這裡也是同樣有效的方法。我們可能會認為,我們可以透過設定 transaction-id-prefix
屬性,快速地將多筆記錄的發布包裝在單一交易中。然而,我們需要的不僅僅是這些才能在這裡幫助我們。我們仍然需要提供前綴屬性。然而,僅憑這一點,每次發送仍然發生在其專用交易中。為了確保所有五筆記錄的整個端到端發布以原子方式發生,我們需要在方法上應用來自核心 Spring Framework 的 @Transactional
註解。此外,我們必須提供一個交易管理器 Bean - KafkaTransactionManager
- 它使用 Spring Cloud Stream Kafka Binder 建立的相同生產者工廠。以下是我們目前的程式碼和應用程式的組態
@SpringBootApplication
@RestController
public class SpringCloudStreamProducer {
@Autowired
StreamBridge streamBridge;
@Autowired Sender sender;
@Autowired
DefaultBinderFactory binderFactory;
public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamProducer.class, args);
}
@PostMapping("/send-data")
public void publishData() throws InterruptedException {
sender.send(streamBridge);
}
@Component
static class Sender {
@Transactional
public void send(StreamBridge streamBridge)
{
for (int i = 0; i < 5; i++) {
streamBridge.send("mySupplier-out-0", "data-" + i);
}
}
}
@Bean
KafkaTransactionManager customKafkaTransactionManager() {
KafkaMessageChannelBinder kafka = (KafkaMessageChannelBinder)this.binderFactory.getBinder("kafka", MessageChannel.class);
ProducerFactory<byte[], byte[]> transactionalProducerFactory = kafka.getTransactionalProducerFactory();
KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager(transactionalProducerFactory);
return kafkaTransactionManager;
}
}
以及相應的組態
spring:
cloud:
stream:
bindings:
mySupplier-out-0:
destination: my-topic
kafka:
binder:
Transaction:
transaction-id-prefix: mySupplier-
producer:
configuration:
retries: 1
acks: all
請注意,前面程式碼中的交易方法(使用 @Transactional
註解的方法)必須與調用該方法的類別不同。如果調用發生在同一類別的方法之間,或發生在非 Spring 管理的 Bean 的不同類別之間,則不會有代理,並且交易攔截器不會啟動。JVM 在運行時不知道代理和攔截器機制。當在方法上新增 @Transactional
註解時,Spring 會在幕後為該方法建立交易代理。當 Spring Cloud Stream 調用交易方法時,代理會攔截該調用,然後透過代理物件發生實際調用。
我們提供的自訂 KafkaTransactionManager
Bean 有兩個目的。首先,它使 Spring Boot 應用 @EnableTransactionManagerment
。它還提供 Binder 內部使用的相同生產者工廠,以便 Transactional 註解在應用交易時使用正確的資源。
當 Spring Boot 偵測到可用的交易管理器 Bean 時,它會自動為我們應用 @EnableTransactionManagement
註解,該註解負責偵測 @Transactional
註解,然後透過 Spring AOP 代理和通知機制新增攔截器。換句話說,Spring AOP 為 @Transactional
方法建立代理,並包含 AOP 通知。如果未應用 @EnableTransactionManagement
註解,Spring 就不會觸發任何這些代理和攔截機制。由於 EnableTransactionManagement 註解對於這些各種原因至關重要,因此我們必須提供交易管理器 Bean。否則,方法上的 Transactional 註解就沒有任何意義。
請注意,我們正在從 Binder 取得交易式生產者工廠,並在 KafkaTransactionManager
的建構子中使用它。當此 Bean 出現在應用程式中時,現在所有記錄的整個發布都發生在單一交易的範圍內。我們在追蹤記錄中只看到 beginTransaction…commitTransaction 的單一序列,這表示只有一個適當的交易執行所有發布操作。
在幕後,這些是事件順序
StreamBridge#send
方法時,底層 KafkaTemplate
將使用自訂 KafkaTransactionManager
建立的相同交易資源。由於交易已在進行中,因此它不會啟動另一個交易,而是發布發生在相同的交易生產者上。KafkaResourceHolder
commit 或 rollback 方法,這些方法調用 Kafka 生產者來提交或回滾交易。由於在我們的範例中只有一個自訂 KafkaTransactionManager
Bean,因此我們可以簡單地按原樣使用 Transactional 註解。另一方面,如果我們有多個自訂 KafkaTransactionManager
Bean,我們必須使用正確的 Bean 名稱限定 @Transactional
註解。
如果我們移除自訂 KafkaTransactionManager
並運行此應用程式,您可以看到它建立五個個別交易,而不是單一交易。如果您啟用 TRACE
記錄,您可以在記錄中看到五個 beginTransaction…commitTransaction 序列。
您可以透過編寫交易式消費者 Spring Cloud Stream 應用程式並將其隔離層級設定為 read_committed
來驗證此行為。您可以使用 spring.cloud.stream.kafka.binder.configuration.isolation.level
屬性並將其值設定為 read_committed
來執行此操作。為了測試目的,新增 Thread.sleep
或其他等待機制,以模擬 for 迴圈中每次 StreamBridge#send
之後的行為。您可以看到,一旦每次 send
方法調用返回,無論等待時間長短,消費者都會收到資料,從而證明並非單一交易執行了整個操作,而是每次 send
都發生在自己的交易中。
我們看到每次發送的個別交易,因為 Transactional
註解沒有做我們期望它做的事情。Transactional
註解僅在有可用的交易管理器 Bean 且其生產者工廠與 Binder 使用的相同時才有效。
如果 Spring Boot 透過 spring.kafka.producer.transaction-id-prefix
偵測到組態中的 transaction-id-prefix
屬性,則會自動組態 KafkaTransactionManager
。然而,由於我們處於 Spring Cloud Stream 情境中,因此我們必須使用 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix
,因為這是我們向框架發送信號,以為 Binder 和關聯的交易式生產者工廠建立內部交易管理器的方式。如果我們提供適當的 spring.kafka
前綴,以便 Spring Boot 為我們自動組態 KakaTransactionManager
會怎樣?雖然這非常誘人,但它不起作用,因為自動組態的交易管理器使用的生產者工廠與 Binder 使用的不同。因此,我們必須提供一個自訂 KafkaTransactionManager
,它使用與 Binder 相同的生產者工廠。這正是我們上面所做的。
在本部落格系列的下一部分中,我們將學習如何針對生產者和消費者啟動的交易與外部交易管理器同步。