使用 Spring Cloud Stream 和 Apache Kafka 的事務回滾策略

工程 | Soby Chacko | 2023 年 10 月 11 日 | ...

此部落格系列的其他部分

第 1 部分:Spring Cloud Stream Kafka 應用程式中的事務簡介

第 2 部分:Spring Cloud Stream Kafka 應用程式中生產者啟動的事務

第 3 部分:在 Spring Cloud Stream Kafka 應用程式中與外部事務管理器同步

在本部落格系列的過去三個章節中,我們分析了事務在 Spring Cloud Stream Kafka 應用程式中的運作方式。我們遇到了事務有用的不同情境,包括生產者和消費者應用程式,以及應用程式如何正確使用它們。既然這些基本要素已在我們身後,讓我們繼續討論事務的另一個面向:當錯誤發生時回滾事務。當錯誤發生且事務系統無法提交事務時,事務管理器會回滾事務,並且不會持久化任何內容供下游消費者查看。如果應用程式可以指示此回滾機制的運作方式,將會很有幫助。Spring Cloud Stream 通過 Spring 對 Apache Kafka 的基本支援來促進此回滾自訂。我們必須注意關於生產者和消費者(consume-process-produce)事務應用程式的一些事項。我們將巡覽這些事項。

生產者啟動的事務

以下是我們在前一篇文章中看過的一段程式碼片段。

@Transactional        
public void send(StreamBridge streamBridge)      
{
    for (int i = 0; i < 5; i++) {
      streamBridge.send("mySupplier-out-0", "my data: " + i);           
    }
}

如果事務方法拋出例外,我們該怎麼辦?答案是從 Spring Cloud Stream 的角度來看,我們不需要做任何事情。事務攔截器會啟動回滾,最終,Kafka 中的事務協調器會中止事務。最終,例外會傳播到呼叫者,然後如果錯誤是暫時性的,它可以決定重新觸發事務方法。框架不會重試,因為這是生產者啟動的事務。這種情況很簡單,因為在事務回滾期間,我們不需要從應用程式或框架的角度做任何事情。如果發生錯誤,保證會回滾。然而,回想一下,即使事務已回滾,Kafka 日誌中可能仍有未提交的記錄。隔離等級為 read_uncommitted(預設值)的消費者仍然會收到這些記錄。因此,消費者應用程式必須確保它們使用 read_committed 的隔離等級,以便它們不會收到任何被上游事務回滾的記錄。

生產者啟動的事務與外部事務同步

我們在本部落格系列的最後一部分看到了這種情況。與第一種情況一樣,如果方法拋出例外並發生回滾,即使 Kafka 事務與資料庫事務同步,應用程式也無需執行任何操作來處理錯誤。事務會從資料庫和 Kafka 發布中回滾。

消費者啟動的事務回滾

如果生產者啟動的事務回滾如此簡單,您可能會想知道有什麼大不了的,以及為什麼我們必須專門用一整篇文章來討論這個主題。何時應用程式需要提供特定的回滾策略?當您正在進行消費者啟動的事務時,這才有意義,因為我們需要特別注意如何處理已消耗記錄的狀態及其偏移量。讓我們重新檢查我們從本系列先前部落格中執行的消費者啟動的事務方法程式碼。

public Consumer<PersonEvent> process(TxCode txCode) {
   return txCode::run;
}

@Component
class TxCode {

   @Transactional
   void run(PersonEvent pe) {
       Person person = new Person();
       person.setName(pe.getName());

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       streamBridge.send("process-out-0", event);
   }
}

您可能還記得,這是一種consume-process-produce模式,從頭到尾以事務方式完成。如果事務方法拋出例外怎麼辦?在這裡,我們需要了解框架在回滾事務時如何處理已消耗的記錄。Spring for Apache Kafka 中的底層訊息監聽器容器允許設定回滾處理器

訊息監聽器容器使用來自上次消費者輪詢的剩餘記錄(失敗記錄位於列表的開頭)調用 AfterRollbackProcessor API。實作使用主題/分區資訊來確保在下一次輪詢期間再次提取失敗的記錄。當應用程式在 Spring Cloud Stream 中啟用事務時,我們使用名為 DefaultAfterRollbackProcessor 的預設實作,該實作實作 AfterRollbackProcessor API。因此,當事務回滾時,此實作預設會啟動。讓我們檢查一下當此 AfterRollbackProcessor 運作時會發生什麼情況。

Spring Cloud Stream 允許您通過消費者綁定設定方法調用重試的最大次數。例如,spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts。最大嘗試次數值包括初始嘗試。此值的預設值為三次。如果您想停用重試,您可以將此值設定為一次。在這種情況下,框架只會嘗試記錄一次。此值包含記錄的第一次嘗試。因此,在預設的三次情況下,綁定器會在初始嘗試後重試兩次。

當使用者方法拋出例外時,容器最初啟動的事務會回滾。由於我們處於事務上下文中,因此容器接著會在新的事務中使用事務範本調用 AfterRollbackProcessor 的處理方法,這會啟動新的 Kafka 事務。在執行 AfterRollbackProcessor 的處理方法時,它會檢查是否還有任何待處理的重試,這基於最大嘗試次數配置。如果它發現更多重試,它會提交當前事務,這是一個空操作,因為在檢查期間沒有發生任何事情。消費者尋找會與失敗的記錄一起發生,以便下一次輪詢返回此失敗的記錄。然後,消費者輪詢更多記錄,這會重新傳遞失敗的記錄。整個流程再次開始並繼續。如果再次失敗,它會重複直到用盡所有可用的重試次數。一旦所有重試次數都用盡,AfterRollbackProcessor 會調用已註冊的恢復器。Spring Cloud Stream 註冊一個恢復器,該恢復器會將錯誤記錄傳送到錯誤通道。之後,輸入(已恢復)記錄的偏移量會傳送到新的事務。在此之後,當前事務會提交,這會以原子方式將偏移量傳送到事務並提交記錄的偏移量。現在流程已完成。已恢復的記錄不包含在消費者尋找中,並且下一次輪詢會傳回新的記錄。

如果恢復因任何原因失敗,則容器的行為就好像重試尚未用盡,並進入無休止的重試。如上所述,當恢復成功時,失敗的記錄不會包含在尋找中,因此下一次輪詢不會傳回該記錄。

假設應用程式將最大嘗試次數設定為兩次,並且記錄兩次都失敗,則以下是使用事務時的事件順序

  1. 消費者輪詢記錄,並且在 TransactionTemplate 的 execute 方法中調用 Spring Kafka 中的監聽器,這會觸發 KafkaTransactionManager 啟動新的事務。
  2. 最終,監聽器調用使用者方法,該方法使用 @Transactional 註解。
  3. 事務攔截器攔截事務方法,並通過使用其事務管理器啟動新的 JPA 事務。
  4. 當它到達資料庫操作時,由於我們處於方法執行的中間,因此不會發生提交或回滾。
  5. StreamBridge 調用 send 方法,該方法發布到 Kafka 主題。這裡不會啟動新的 Kafka 事務,因為已經有一個 Kafka 事務正在進行中。KafkaTemplate 使用相同的事務資源(生產者)進行發布。
  6. 該方法從其任何操作中拋出例外,並且事務攔截器捕獲該例外並對 JPA 事務執行回滾。
  7. 例外會傳播回 Spring Kafka 中的訊息監聽器容器,其中監聽器通過 TransactionTemplate 的 execute 方法調用使用者方法。然後,它會回滾 Kafka 事務。
  8. 此時,容器會在新的事務中調用 AfterRollbackProcessor,因為我們處於事務上下文中。它在其 TransactionTemplate 上啟動另一個 execute 操作,通過 KafkaTransactionManager 建立新的 Kafka 事務。
  9. TransactionTemplate 的 execute 方法調用 AfterRollbackProcessor API 的處理方法並立即返回,因為還剩一次重試(因為我們最多有兩次嘗試)。
  10. 然後,容器提交新的 Kafka 事務,關閉事務而不做任何事情 - 基本上是一個空操作。
  11. 下一個消費者輪詢重新傳遞失敗的記錄,容器通過在新的事務中再次調用監聽器來重試(步驟 1)。
  12. 重複步驟 2 - 8。
  13. TransactionTemplate 的 execute 方法調用 AfterRollbackProcessor 的處理方法,並發現沒有剩餘更多重試次數。
  14. process 方法調用已註冊的恢復器。由於我們將其作為 Spring Cloud Stream 應用程式執行,因此預設恢復器會傳送到錯誤通道。
  15. 在記錄恢復後,已恢復記錄的偏移量(最初由消費者消耗)偏移量會通過使用事務上的生產者傳送到事務。
  16. 一旦 AfterRollbackProcessor 中的 process 方法返回,容器就會調用事務的提交操作,這會以原子方式將偏移量傳送到事務並執行消費者偏移量提交。

為什麼我們需要在上面的步驟 8 中建立新的事務,以及每次在失敗嘗試後調用 AfterRollbackProcessor 時?為什麼我們不能在提交原始 Kafka 事務之前調用 AfterRollbackProcessor?雖然在每次執行回滾後任務的失敗嘗試後建立新的 Kafka 事務可能聽起來像是多餘的開銷,但這是必要的。當原始事務中發生回滾時,它不會將偏移量傳送到事務。如果有重試,容器會在新的事務中再次調用監聽器,並且循環會繼續,直到重試用盡並且記錄被恢復。容器建立並回滾的事務可能與最大嘗試次數一樣多,而沒有將偏移量傳送到事務。每次原始事務回滾時,容器都會為 AfterRollbackProcessor 調用啟動相應的新事務,其提交是空操作(除了恢復後的最後一次提交)。在恢復記錄後,最後一次調用會將偏移量傳送到事務,以便以原子方式提交偏移量並在 Kafka 端執行必要的事務清理。因此,正如我們所看到的,為了將偏移量傳送到事務,我們需要在新的事務中調用 AfterRollbackProcessor

自訂 AfterRollbackProcessor

如果應用程式想要自訂回滾後任務,而不是使用 Spring Cloud Stream 使用的預設 - DefaultAfterRollbackProcessor - 那麼它可以使用 ListenerContainerCustomizer 來提供自訂的 AfterRollbackProcessor。以下列表顯示了如何執行此操作

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
   return (container, destination, group) -> container.setAfterRollbackProcessor(
           new DefaultAfterRollbackProcessor<byte[], byte[]>(
                   (record, exception) -> System.out.println("Discarding failed record: " + record),
                   new FixedBackOff(0L, 1)));
}

當提供上述自訂時,恢復器會記錄錯誤並繼續。DefaultAfterRollbackProcessor 的建構子也採用退避,且沒有重試。因此,在此範例中,只要方法中第一次發生例外,記錄就會通過記錄來恢復。

記錄恢復期間的事務性 DLQ 發布

Spring Cloud Stream 允許您在用盡所有重試次數後,將失敗的記錄作為恢復過程的一部分發送到唯一的 DLQ(死信佇列)主題。我們提到 Spring Cloud Stream Kafka 綁定器使用的 DefaultAfterRollbackProcessor 會將記錄發送到錯誤通道。當應用程式啟用 DLQ 時,綁定器會將失敗的記錄發送到特殊的 DLT 主題。這如何發生的詳細資訊不在我們的事務討論範圍內。然而,問題是 DLT 發布是否是事務性的。當設定 DLQ 基礎架構時,如果應用程式使用事務(也就是說,它提供 transaction-id-prefix),則綁定器會使用 KafkaTransactionManager 中使用的相同原始事務生產者工廠。因此,框架保證以事務方式發布到 DLT

通過本文中的討論,我們涵蓋了在 Spring Cloud Stream Kafka 應用程式中使用事務時的所有主要建構塊。在本部落格系列的下一節中,我們將研究 Kafka 中事務的實際應用,即流行的精確一次語義,以及我們如何在 Spring Cloud Stream Kafka 應用程式中啟用它們。

取得 Spring 電子報

保持與 Spring 電子報的聯繫

訂閱

搶先一步

VMware 提供培訓和認證,以加速您的進展。

了解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

了解更多

即將舉行的活動

查看 Spring 社群中所有即將舉行的活動。

查看所有