在 Spring Cloud Stream Kafka 應用程式中與外部交易管理器同步

工程 | Soby Chacko | 2023 年 10 月 04 日 | ...

本部落格系列的其他部分

第 1 部分:Spring Cloud Stream Kafka 應用程式交易簡介

第 2 部分:Spring Cloud Stream Kafka 應用程式中由生產者啟動的交易

在本部落格系列前一部分中,我們看到了交易管理的基本知識,主要是在使用由生產者啟動的 Spring Cloud Stream Kafka 應用程式時。在該討論中,我們也簡要地了解了 Spring Cloud Stream Kafka 消費者應用程式如何以適當的隔離層級使用交易方式產生的記錄。當您與外部交易管理器(例如關係資料庫的交易管理器)同步時,我們提到您必須使用交易以確保資料完整性。在本文中,我們將了解在使用外部交易管理器時,如何在 Spring Cloud Stream 中實現交易保證。

在我們開始探索之前,重要的是要記住,在實務中實現分散式交易極其困難。您必須依賴兩階段提交 (2PC) 策略和適當的分散式交易管理器,例如 JTA 相容的交易管理器,才能正確地執行此操作。然而,大多數企業用例可能不需要這種複雜程度,而且我們考慮到並看到人們在實務中使用的多數用例,最好堅持使用非分散式交易方法,正如我們在本部落格中所述。這篇文章由 Spring 工程團隊的 Dr.Dave Syer 於 2009 年發布,對於理解分散式交易的挑戰以及 Spring 中建議的替代方法,至今仍然具有參考價值(即使在 14 年後)。

讓我們回到我們的討論:在使用外部交易管理器在生產者啟動和消費-處理-生產(讀取-處理-寫入)應用程式中,如何在 Spring Cloud Stream Kafka 應用程式中實現交易性。

現在,我們可以透過草擬一些程式碼來為我們的範例領域中的討論奠定基礎,我們可以在討論中逐步完成這些程式碼。我們使用一些領域物件來驅動演示,並為它們建立了虛擬程式碼。

假設訊息系統處理「事件」領域類型 - 讓我們使用 PersonEvent

class PersonEvent {

   String name;
   String type;

   //Rest omitted for brevity
}

我們還需要 Person 物件的領域實體

@Entity
@Table(name = "person")
public class Person {

   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Long id;

   private String name;

   // Rest omitted for brevity
}

最後,我們需要 Person 領域物件的 CrudRepository

public interface PersonRepository extends CrudRepository<Person, String> {}

在生產者啟動的情境中,假設當呼叫方法(例如透過 REST)時,會建立 Person 領域物件、持久儲存到資料庫,並透過 StreamBridge 作為 PersonEvent 發送到輸出 Kafka 主題。

消費-處理-生產 情境中,假設輸入主題接收 PersonEvent,處理器從中產生 Person 領域物件以持久儲存到資料庫。最後,它產生另一個 PersonEvent 到輸出 Kafka 主題。

讓我們也在這裡使用 JPA 進行討論。Spring Cloud Stream 應用程式是 Boot 應用程式,您可以在應用程式中包含 spring-boot-starter-jpa 依賴項,並包含適當的 spring.jpa.* 屬性來驅動必要的自動配置。假設 Spring Boot 將為我們自動配置 JPATransactionManager

讓我們將我們的用例分解為各種情境。

情境 1:生產者啟動的交易

在生產者啟動的情境中,我們有兩個必須以交易方式執行的操作:資料庫操作,然後是 Kafka 發佈操作。這是基本概念。請記住,此程式碼僅顯示所涉及內容的重點。在真實世界的設定中,程式碼幾乎肯定會比這複雜得多。

@Autowired
Sender sender;

@PostMapping("/send-data")
public void sendData() {
   sender.send(streamBridge, repository);
}

@Component
static class Sender {

   @Transactional
   public void send(StreamBridge streamBridge, PersonRepository repository) {
       Person person = new Person();
       person.setName("Some Person");

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       streamBridge.send("process-out-0", event);
   }
}

上述生產者啟動的程式碼是完全交易性的。在本部落格的前一部分中,我們看到,如果您只有 Kafka 交易,則僅新增 Transactional 註解是不夠的。正如討論的那樣,Transactional 註解沒有交易管理器,我們需要一個自訂交易管理器,該管理器使用相同的底層交易資源來實現交易性。然而,這裡的情況有所不同。我們有 Spring Boot 自動配置的 JpaTransactionManager,交易攔截器使用它來啟動交易。由於我們配置了 transaction-id-prefix,因此 StreamBridge 發送操作可以以交易方式完成。但是,KafkaTemplate 會透過 TransactionSynchronizationManager 將 Kafka 交易與已存在的 JPA 交易同步。在方法結束時,首先提交主要交易,然後是同步交易,在本例中為 Kafka 交易。

此流程中的順序如下。
  1. JPA 交易管理器啟動新的 JPA 交易。
  2. 資料庫操作開始,但此處未發生提交,因為我們仍在方法執行中。
  3. StreamBridge 發送操作觸發新的 Kafka 交易,透過交易同步管理器與 JPA 交易同步。
  4. 當方法結束時,首先提交 JPA 交易,然後提交 Kafka 交易。

關於在 Spring 中同步交易的一般注意事項: 這聽起來可能像是在幕後進行複雜的交易同步。但是,正如我們在本文開頭所暗示的那樣,這裡沒有進行分散式交易同步,更不用說任何在各種交易之間同步的智慧方法。交易本身對同步一無所知。Spring 的 TransactionSynchronizatonManager 僅協調多個交易的提交和回滾。在這種情況下,同步交易在功能上類似於巢狀兩個或多個 @Transactional 方法或 TransactionTempate 物件。配置較少,因為 Spring 為您執行巢狀。

情境 2:反轉提交順序

假設我們需要由於流程中的一些新需求而反轉提交順序,讓 Kafka 交易先提交而不是 JPA 交易。我們該怎麼做?一種可能直觀地想到的解決方案是顯式地為 @Transactional 註解提供 Kafka 交易管理器,並讓 JPA 交易與 Kafka 交易同步,後者是主要的交易。程式碼如下所示

@Transactional(“customKafkaTransactionManager)
public void send(StreamBridge streamBridge, PersonRepository repository) {
    Person person = new Person();
    person.setName("Some Person");

    Person savedPerson = repository.save(person);

    PersonEvent event = new PersonEvent();
    event.setName(savedPerson.getName());
    event.setType("PersonSaved");
    streamBridge.send("process-out-0", event);
}

我們需要提供自訂 Kafka 交易管理器

@Bean
KafkaTransactionManager customKafkaTransactionManager() {
   KafkaMessageChannelBinder kafka = (KafkaMessageChannelBinder) this.binderFactory.getBinder("kafka", MessageChannel.class);
   ProducerFactory<byte[], byte[]> transactionalProducerFactory = kafka.getTransactionalProducerFactory();
   KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager(transactionalProducerFactory);
   return kafkaTransactionManager;
}

由於 Spring Boot 在偵測到已存在交易管理器時不會配置交易管理器,因此我們必須自行配置 JPA 交易管理器

@Bean
public PlatformTransactionManager transactionManager(
       ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
   JpaTransactionManager transactionManager = new JpaTransactionManager();
   transactionManagerCustomizers.ifAvailable((customizers) -> customizers.customize(transactionManager));
   return transactionManager;
}

我們的直覺在這裡奏效了嗎?我們是否已成功更改交易的應用順序?不幸的是,沒有。它不起作用,因為 JPA 交易管理器不允許其交易與其他交易同步,例如本例中來自主要交易管理器的交易(自訂 Kafka 交易管理器)。在我們的例子中,儘管我們使自訂 Kafka 交易管理器成為主要的交易管理器,但 JPA 交易在執行儲存庫儲存方法時自行啟動和提交,而沒有與主要交易同步。

此流程中的事件順序如下
  1. Kafka 交易管理器啟動攔截器使用的新交易。
  2. 當儲存庫儲存方法執行時,JpaTransactionManager 會建立 JPA 交易,而沒有與主要交易同步。
  3. JPA 交易在方法執行期間提交。
  4. 攔截器將在方法結束時提交 Kafka 交易。

那麼,我們如何反轉交易呢?有兩種方法可以做到這一點。

首先,我們可以嘗試鏈式交易管理器。ChainedTransactionManager 是來自 Spring Data 專案的交易管理器實作。您可以指定要給 ChainedTransactionManager 的交易管理器列表,它會按照其列表中交易管理器的順序啟動交易。在退出時(即方法結束時),交易會按照交易管理器列表的相反順序提交。

雖然這聽起來像是一個合理的策略,但要記住的一個很大的注意事項是,ChainedTransactionManager 目前已棄用,並非建議的選項。棄用的原因是 Javadoc 中所述。要點是人們通常期望 ChainedTransactionManager 成為解決所有交易問題的神奇銀彈,包括具有兩階段提交和其他問題的分散式交易,但這與事實相去甚遠。ChainedTransactionManager 僅確保交易以特定順序啟動和提交。它不保證任何交易同步,更不用說任何分散式交易協調。假設您對 ChainedTransactionManager 的限制感到滿意,並且想要特定的順序,正如我們的用例所要求的那樣。在這種情況下,只要您記住您正在使用框架中已棄用的類別,就可以合理地使用此交易管理器。

讓我們在我們的情境中嘗試 ChainedTransactionManager,看看效果如何。Spring for Apache Kafka 提供了一個名為 ChainedKafkaTransactionManager 的子類別,它也已棄用,因為父類別已棄用。

我們使用與之前在鏈式交易中看到的相同的自訂 KafkaTransactionManager bean。

我們還需要建立 JpaTransactionManager bean,與之前一樣,因為 Spring Boot 不會自動配置它,因為它已經偵測到自訂 KafkaTransactionManager bean。

一旦我們新增了這兩個 bean,讓我們建立 ChainedKafkaTransactionManager bean

@Bean
public ChainedKafkaTransactionManager chainedKafkaTransactionManager(KafkaTransactionManager kafkaTransactionManager, PlatformTransactionManager transactionManager) {
   return new ChainedKafkaTransactionManager(jpaTransactionManager, kafkaTransactionManager);
}

完成這些設定後,讓我們修改 Transactional 註解

@Transactional("chainedKafkaTransactionManager")
public void send(StreamBridge streamBridge, PersonRepository repository) {
..
}

上述配置完成了我們想要的結果。當您執行此應用程式時,我們反轉了交易,正如預期的那樣 - 也就是說,Kafka 將首先提交,然後是 JPA。

以下是流程中的步驟
  1. TransactionInterceptor 使用自訂 ChainedKafkaTransactionManager 來啟動交易。它使用 JpaTransactionManager 啟動 Jpa 交易,並對 KafkaTransactionManager 執行相同的操作。
  2. 當方法呼叫資料庫操作時,由於它已在 JPA 交易中執行,因此不會啟動另一個交易。此處未發生提交或回滾,因為這不是新的交易。
  3. 接下來,方法透過 StreamBridge 執行 Kafka 發佈。我們看到與上面 JPA 相同的情況。由於已存在 Kafka 交易,因此它不會啟動新的 Kafka 交易。StreamBridge 發送操作是透過使用初始 Kafka 交易使用的相同交易生產者工廠來完成的。此處未發生提交或回滾。
  4. 當方法結束時,鏈式交易管理器會按相反順序執行,從 Kafka 交易提交(或回滾)開始,然後是 JPA 交易。

如果您對鏈式交易管理器的限制感到滿意,則此方法有效。請記住,這裡沒有交易同步。交易管理器在交易開始時按給定的順序應用,而在提交或回滾時按相反的順序應用。如果您要選擇此途徑,由於您正在使用框架中已棄用的類別,因此複製它們並在您的專案中使用它們將是一個好主意,而不是依賴框架。由於它們已棄用,因此不保證會有新功能和錯誤修復。未來的版本可能會完全刪除它們。也可能永遠不會刪除它,並且棄用狀態的存在是為了勸阻其使用(因為人們認為它具有比實際更大的功能)。

如果您不想依賴框架中已棄用的類別,或者不想複製它們並在您的終端維護它們,您可以嘗試另一個選項。您可以建立兩個交易方法並巢狀呼叫。以下是該想法的藍圖

@Component
static class Sender {

       @Transactional("jpaTransactionManager")
       public void send(StreamBridge streamBridge, PersonRepository repository, KafkaSender kafkaSender) {
           Person person = new Person();
           person.setName("Some Person");

           Person savedPerson = repository.save(person);

           PersonEvent event = new PersonEvent();
           event.setName(savedPerson.getName());
           event.setType("PersonSaved");
           kafkaSender.send(streamBridge, event);
       }
}

@Component
static class KafkaSender {
       @Transactional("customKafkaTransactionManager")
       public void send(StreamBridge streamBridge, PersonEvent event) {
           streamBridge.send("process-out-0", event);
       }
}

確保巢狀呼叫在不同的類別中,原因我們在 本部落格系列的第 2 部分 中討論過,因為 Spring 中的 AOP 代理運作方式。

在這種情況下,兩種方法都是交易性的,並且它們是巢狀的。當交易攔截器攔截第一個方法呼叫時,它會啟動 JPA 交易。在執行過程中,巢狀呼叫(其方法也具有 @Transactional 註解)進入。由於此 bean 方法具有 @Transactional 註解,因此 Spring AOP 將 bean 包裝在 AOP 通知中。由於我們從不同類別中的另一個 bean 呼叫此已通知的 bean,因此代理機制會正確地調用已通知的 bean。另一個交易攔截器使用不同的交易管理器(即 KafkaTransactionManager)啟動新的交易。當 Kafka 發佈發生時,交易不會立即提交或回滾,因為交易是作為方法的一部分啟動的,並且提交或回滾發生在方法結束時。此時,控制權返回到第一個方法並繼續。一旦它退出原始方法,JPA 交易就會透過攔截器提交。如果發佈到 Kafka 的方法拋出異常,它會回滾該交易。在這種情況下,在回滾後,異常會傳播回第一個交易方法(JPA 方法),該方法也會由於異常而回滾其交易。

使用此技術時的重要注意事項 巢狀方法的呼叫應該是第一個方法執行的最後一件事,因為如果第一個方法在 Kafka 呼叫成功後未能執行某些程式碼,則 Kafka 交易已提交。第一個方法中的失敗不會自動回滾 Kafka 交易。

情境 3:消費-處理-生產

透過我們到目前為止在本系列中獲得的關於交易的核心理解,讓我們看看事件驅動和串流應用程式中的一個關鍵模式,稱為 消費-處理-生產 模式。在 Spring Cloud Stream 中,這種模式的實作如下所示

@Bean
public Function<PersonEvent, PersonEvent> process(TxCode txCode) {
  return pe -> txCode.run(pe);
}

@Component
class TxCode {

   @Transactional
   PersonEvent run(PersonEvent pe) {
       Person person = new Person();
       person.setName(pe.getName());

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       return event;
   }
}

我們有一個 Spring Cloud Stream 函數,它從輸入主題消費 PersonEvent,然後呼叫函數以在函數的 lambda 運算式的主體中處理。此函數傳回另一個 PersonEvent,我們將其發佈到輸出 Kafka 主題。如果我們不在交易上下文中,我們可以將上述 run 方法內聯作為函數的 lambda 運算式的一部分。但是,為了實現交易語意,@Transactional 註解必須位於不同類別的方法上。

為了使 binder 具有交易性,請確保您提供具有有效值的 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix

上面的程式碼是否完全交易性?然而,現實情況是,它僅在端對端方面是部分交易性的。讓我們看看事件順序。

binder 是交易性的,因為我們提供了 transaction-id-prefix。當消費者輪詢訊息監聽器容器中的記錄時,它會在 TrasactionTemplate#execute 方法中調用內部監聽器方法。因此,執行監聽器方法(調用使用者方法)的整個端對端流程都在 KafkaTransactionManager 啟動的交易中執行。當交易啟動時,TransactionSynchronizationManager 會將資源(生產者)綁定到交易。當使用者方法(使用 @Transactional 註解的方法)被呼叫時,交易攔截器會攔截該呼叫,讓包裝的 AOP 通知處理實際的調用。由於我們有 JpaTransactionManager,因此交易攔截器會使用該管理器並啟動新的交易。是否要與現有交易同步取決於每個交易管理器實作的決定。在 JpaTransactionManager(和許多其他類似的資料庫交易管理器實作)的情況下,它不允許與現有交易同步,正如我們在上面已經討論過的那樣。因此,JPA 交易獨立執行,如上面的章節所示。當 run 方法結束時,交易攔截器會使用 JPA 交易管理器執行提交或回滾操作。這樣,JPA 交易管理器就完成了它的工作。此時,來自方法調用的回應會返回給呼叫者,即 Spring Cloud Stream 基礎架構。Spring Cloud Stream 中的此機制會取得此回應,並將其發送到 Kafka 中的輸出主題。它使用在初始交易開始時綁定的相同交易生產者。在發送記錄後,控制權返回到訊息監聽器容器,然後容器提交或回滾交易。

以下是此順序中的步驟
  1. Kafka 消費者接收記錄。
  2. Spring Kafka 中的容器透過使用 TransactionTemplateexecute 方法來調用監聽器。

KafkaTransactionManager 啟動新的交易。3. Kafka 資源已綁定(生產者)。4. 當它到達使用者程式碼時,交易攔截器最終會攔截該呼叫並啟動新的 JPA 交易。5. AOP 代理接著調用實際方法。當方法結束時,JpaTransactionManager 提交或回滾。6. 方法的輸出返回到 Spring Cloud Stream 中的呼叫者。7. 然後使用步驟 4 中的相同交易資源將回應發送到 Kafka 輸出。8. 控制權返回到訊息監聽器容器,並且 KafkaTransactionManager 提交或回滾。

那麼,這裡的問題是什麼?它看起來是交易性的,但實際上,它只是部分交易性的。最初的主要問題是,整個端對端流程都在單一原子交易的範圍之外,這是一個重大問題。這裡有兩個交易 - Kafka 和 JPA - JPA 和 Kafka 交易之間沒有同步。如果資料庫交易已提交,而 Kafka 發送失敗,則無法回滾 JPA 交易。

我們可能會認為 ChainedTransactionManager 可以在這裡提供幫助。雖然這種直覺有一些優點,但它不適用於上面的程式碼。由於在調用監聽器方法時在容器中建立的 Kafka 交易,ChainedTransactionManager 不會從提供給它的任何 Kafka 交易管理器建立任何新的 Kafka 交易。當退出使用者方法時,我們仍然需要提交或回滾單一 JPA 交易。Kafka 交易必須等到呼叫返回容器才能提交或回滾。

問題是我們在 Spring Cloud Stream 中使用一個函數,該函數使框架能夠發佈到 Kafka。在我們的例子中,任何使用者指定的交易(例如 JPA 交易)都發生在 Spring Cloud Stream 執行 Kafka 發佈之前。我們需要確保使用者程式碼是發佈到 Kafka 的程式碼,以便我們可以將整個交易程式碼視為一個單元。為了實現這一點,我們應該切換到 Consumer 而不是 Function,然後使用 StreamBridge API 發佈到 Kafka。看看這個修改後的程式碼

@Bean
public Consumer<PersonEvent> process(TxCode txCode) {
   return txCode::run;
}

然後我們使用與上面相同的 TxCode

@Component
class TxCode {

   @Transactional
   void run(PersonEvent pe) {
       Person person = new Person();
       person.setName(pe.getName());

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       streamBridge.send("process-out-0", event);
   }
}

請注意,run 方法不傳回任何內容,但我們透過 StreamBridge API 顯式地發送到輸出 Kafka 主題。

讓我們看看這些變更後的事件順序
  1. Kafka 消費者接收記錄。
  2. Spring Kafka 中的容器透過使用 TransactionTemplate 的 execute 方法來調用監聽器。
  3. KafkaTransactionManager 啟動新的交易。
  4. Kafka 資源已綁定(生產者)。
  5. 當它到達使用者程式碼時,攔截器會攔截該呼叫,並使用 JpaTransactionManager 啟動新的交易。
  6. 調用實際的使用者方法。
  7. Kafka 發送操作是透過 StreamBridge 作為方法執行的一部分來完成的。底層 KafkaTemplate 使用步驟 4 中綁定的相同交易生產者工廠。
  8. 當方法結束時,JpaTransactionManager 提交或回滾。
  9. 最後,當 Kafka 交易提交(或回滾)時,控制權返回到 TransactionTemplate#execute 方法。

請特別注意上面的步驟 7。當 KafkaTemplate 偵測到已存在正在進行的 Kafka 交易(在步驟 3 中開始)時,即使 KafkaTemplate 有能力這樣做,它也不會與 JPA 交易同步。現有的 Kafka 交易優先,並且它會加入該交易。

即使我們仍然有兩個單獨的交易,但從端對端交易的角度來看,事情是原子的。如果透過 StreamBridge 的 Kafka 發佈操作失敗,則 JPA 和 Kafka 交易都不會執行提交操作。兩者都會回滾。同樣地,如果資料庫操作失敗,兩個交易仍然會回滾。但是,始終存在一個交易提交而另一個交易回滾的可能性,因此應用程式程式碼必須處理記錄的重複資料刪除,以實現容錯能力。

關於 消費-處理-生產 模式的討論中的另一個關鍵組件是,生產者需要將消費的記錄的偏移量(除了提交偏移量的消費者之外)發送到交易。正如我們在本部落格系列第一部分中看到的那樣,Kafka Producer API 有一個名為 sendOffsetToTransaction 的方法,其中生產者透過 OffsetMetadataConsumerGroupMetadata 為每個分割區發送偏移量(當前訊息的偏移量 + 1)。當使用 Spring Cloud StreamSpring for Apache Kafka 時,應用程式不需要調用這個低階操作。Spring for Apache Kafka 中的 Kafka 訊息監聽器容器會代表應用程式自動處理它。雖然框架在交易提交之前在生產者上呼叫 sendOffsetToTransaction,但在交易協調器提交交易時,將偏移量發送到交易和實際的消費者偏移量提交會原子地發生。

透過此討論,我們冒險進入編寫交易性 Spring Cloud Stream 應用程式的各種選項,這些應用程式必須在消費和生產 Apache Kafka 時與外部交易系統(例如資料庫)互動。

在本系列的下一部分中,我們將研究交易回滾(編寫交易系統時的另一個關鍵方面),以及在編寫 Spring Cloud Stream Kafka 應用程式時如何存取各種 Spring 組件。

取得 Spring 電子報

與 Spring 電子報保持聯繫

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進展。

了解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

了解更多

即將到來的活動

查看 Spring 社群中所有即將到來的活動。

查看全部