Spring Cloud Stream Kafka 應用程式中的生產者啟動交易

工程 | Soby Chacko | 2023 年 9 月 28 日 | ...

本部落格系列的其他部分

第 1 部分:Spring Cloud Stream Kafka 應用程式交易簡介

本文是部落格系列的第 2 部分,我們將詳細探討 Spring Cloud Stream 和 Apache Kafka 中的交易。我們在前一部分中看到了交易的一般簡介,觸及了基本概念。在本部落格系列的這一部分中,我們將深入了解一些實作細節及其應用層面。

在本文中,我們主要停留在生產者端,以了解交易如何與 Spring Cloud Stream 和 Apache Kafka 協同運作。

Spring Cloud Stream 中的生產者

在深入探討生產者啟動的交易之前,我們先從一些基礎知識開始,看看一個簡單的生產者。在 Spring Cloud Stream 中,有幾種方法可以編寫生產者(在訊息傳遞領域中也稱為發布者)。如果您有需要依排程產生資料的使用案例,您可以編寫一個 java.util.function.Supplier 方法,如下所示。

@Bean
public Supplier<Pojo> mySupplier() {
  return () -> {
        new Pojo();
  };
}

當提供上述 Supplier 作為 Spring Bean 時(如程式碼所示),Spring Cloud Stream 會將其視為發布者,並且由於我們在此處處於 Apache Kafka 的上下文中,因此它會將 POJO 記錄發送到 Kafka 主題。

預設情況下,Spring Cloud Stream 每秒調用供應者一次,但您可以透過組態變更該排程。請參閱參考文件以取得更多詳細資訊。

如果您不想輪詢供應者,而是想控制其發布頻率怎麼辦?Spring Cloud Stream 透過 StreamOperations API 及其開箱即用的實作 StreamBridge 提供了一種便利的方式。以下是一個範例。

@Autowired
StreamBridge streamBridge;

@PostMapping("/send-data")
public void publishData() {
   streamBridge.send("mySupplier-out-0", new Pojo());
}

在本例中,應用程式使用 REST 端點來觸發透過 StreamBridge 發布資料。由於框架不會依排程調用該函數,因此任何外部方都可以透過調用 REST 端點來啟動資料發布。

在這些基本生產者中使用交易是否合適?

現在我們已經了解了 Spring Cloud Stream 為發布記錄提供的兩種策略,讓我們回到我們的主要討論主題:交易式發布。假設有一種情境,我們希望在使用這些生產者中的一個或多個時確保資料完整性並獲得交易保證。在這種情況下,問題是我們是否首先需要使用交易才能達成它們。在上面的兩個範例中,您如何確保記錄以交易方式發布?簡短的答案是,您應該避免針對這些類型的發布使用案例使用交易。這些範例中的記錄發布是單次發送情境。使用同步生產者,我們可以達到相同的語意交易保證。預設情況下,生產者是非同步的,當使其在同步模式下運行時,生產者會確保在向客戶端發送回應之前,將記錄寫入領導者和所有副本。您可以透過將 spring.cloud.stream.kafka.bindings.<binding-name>.producer.sync 屬性設定為 true 來啟用同步發布。

總結來說,在設計僅生產者的應用程式時,請謹慎地使用交易。如果您使用 Supplier 或透過 StreamBridge 一次發送一筆記錄,我們不建議使用交易,因為將生產者轉換為同步模式執行將達成相同的結果,而不會產生交易額外負擔。然後,此討論引出了一個有趣的問題。對於僅生產者的應用程式,何時有必要使用交易並獲得好處?如本部落格系列的前一部分中所討論的,這完全取決於應用程式的使用案例。在生產者的情境中,這表示只有當我們需要進行多個相關的發布,或者除了發布之外,我們還需要與外部交易管理器同步時,我們才需要使用交易。本文的下一節涵蓋前一種情境,而本部落格系列的下一篇文章涵蓋後一種情境。

在 Spring Cloud Stream Kafka Binder 中啟用交易

在 Spring Cloud Stream 的 Kafka Binder 中啟用交易的主要驅動因素是一個單一屬性:spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix。當此屬性具有有效的前綴字串時,Spring Cloud Stream 中的 Kafka Binder 會確保底層 KafkaTemplate 使用交易發布資料。順帶一提,此屬性會向 Spring Cloud Stream 發出信號,使其在使用處理器模式(consume-process-produceread-process-write 模式)時,讓消費者感知交易。

交易實作

雖然違反直覺,但讓我們回到我們單一 SupplierStreamBridge 範例(如前所述),並引入交易以了解交易組件的主要用途。如前所述,在這些情況下我們不需要使用交易,因為這會增加更多額外負擔。然而,這樣做有助於我們理解事物。

以下是程式碼再次呈現

@SpringBootApplication
@RestController
public class SimpleSpringCloudStreamProducer {

  @Bean
  public Supplier<Pojo> mySupplier() {
    return () -> {
      new Pojo();
    };
  }

  @Autowired
  StreamBridge streamBridge;

  @PostMapping("/send-data")
  public void publishData() {
   streamBridge.send("mySupplier-out-0", new Pojo());
  }
}

現在讓我們提供必要的屬性。

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: my-transactional-producer-

由於我們在應用程式的組態中提供該屬性,因此每次調用此範例中的供應者(透過框架)或有人調用 StreamBridge#send 方法後面的 REST 端點時,底層發布到 Kafka 主題的操作都會變成完全交易式。

當供應者被觸發時,Kafka Binder 會使用 KafkaTemplate 發布資料。當 Binder 偵測到應用程式提供了 transaction-id-prefix 屬性時,每次 KafkaTemplate#send 調用都會透過 KafkaTemplate#executeInTransaction 方法完成。因此,請放心,框架確實以交易方式執行底層發布到 Kafka 主題的操作。從應用程式角度來看,應用程式開發人員為交易目的需要提供的唯一事情是 transaction-id-prefix 屬性。

在開發或偵錯交易式應用程式時,將記錄層級設定為 TRACE 通常是值得的,這樣相關的底層交易類別可以為我們提供有關正在發生的事情的詳細資訊。

例如,如果您在以下套件上將記錄層級設定為 TRACE,您將在記錄中看到相當多的活動。

logging:
 level:
   org.springframework.transaction: TRACE
   org.springframework.kafka.transaction: TRACE
   org.springframework.kafka.producer: TRACE
   org.springframework.kafka.core: TRACE

每次框架調用供應者方法時,我們可以在追蹤記錄中觀察到以下內容

o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1426370c] beginTransaction()
o.s.kafka.core.KafkaTemplate             : Sending: ProducerRecord
o.s.kafka.core.KafkaTemplate             : Sent: ProducerRecord(topic=myTopic1, partition=null, headers=RecordHeaders(headers = …
o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1426370c] commitTransaction()

您可以從追蹤記錄中看到,每次以交易方式發布記錄時,它都會形成一個序列:beginTransactionSendingSentcommitTransaction。如果您運行應用程式,您將觀察到每秒都會看到這些序列,因為這是 Spring Cloud Stream 調用 Supplier 方法的預設排程頻率。

相同的交易流程也適用於 StreamBridge#send 的情況。當 Spring Cloud Stream 調用 send 方法時,輸出綁定使用的底層 KafkaTemplate 會確保記錄在交易內發布,因為我們提供了 transaction-id-prefix

多筆記錄發布的交易

完成基礎知識講解後,讓我們繼續討論適合使用交易的情況。正如我們之前討論過的,需要將多筆記錄作為單一原子單元發布是一個有效的情境,在這種情境下,使用交易變得必要。

讓我們看看以下程式碼範例

public void publish(StreamBridge streamBridge {
  for (int i = 0; i < 5; i++) {
    streamBridge.send("mySupplier-out-0", "data-" + i);
  }
}

如您所見,這是一個為了示範風險而設計的範例。我們發布了多筆記錄,而不是發布一次。發布到多個主題在這裡也是同樣有效的方法。我們可能會認為,我們可以透過設定 transaction-id-prefix 屬性,快速地將多筆記錄的發布包裝在單一交易中。然而,我們需要的不僅僅是這些才能在這裡幫助我們。我們仍然需要提供前綴屬性。然而,僅憑這一點,每次發送仍然發生在其專用交易中。為了確保所有五筆記錄的整個端到端發布以原子方式發生,我們需要在方法上應用來自核心 Spring Framework 的 @Transactional 註解。此外,我們必須提供一個交易管理器 Bean - KafkaTransactionManager - 它使用 Spring Cloud Stream Kafka Binder 建立的相同生產者工廠。以下是我們目前的程式碼和應用程式的組態

@SpringBootApplication
@RestController
public class SpringCloudStreamProducer {

   @Autowired
   StreamBridge streamBridge;

   @Autowired Sender sender;

   @Autowired
   DefaultBinderFactory binderFactory;

   public static void main(String[] args) {
       SpringApplication.run(SpringCloudStreamProducer.class, args);
   }

   @PostMapping("/send-data")
   public void publishData() throws InterruptedException {
       sender.send(streamBridge);
   }

   @Component
   static class Sender {

     @Transactional        
     public void send(StreamBridge streamBridge)      
     {
       for (int i = 0; i < 5; i++) {
     	   streamBridge.send("mySupplier-out-0", "data-" + i);           
       }
     }
   }

  @Bean
  KafkaTransactionManager customKafkaTransactionManager() {
     KafkaMessageChannelBinder kafka = (KafkaMessageChannelBinder)this.binderFactory.getBinder("kafka", MessageChannel.class);
     ProducerFactory<byte[], byte[]> transactionalProducerFactory = kafka.getTransactionalProducerFactory();
     KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager(transactionalProducerFactory);
     return kafkaTransactionManager;
  }
}

以及相應的組態

spring:
  cloud:
   stream:
     bindings:
       mySupplier-out-0:
         destination: my-topic
     kafka:
       binder:
         Transaction:
		transaction-id-prefix: mySupplier-
producer:
             configuration:
               retries: 1
               acks: all

請注意,前面程式碼中的交易方法(使用 @Transactional 註解的方法)必須與調用該方法的類別不同。如果調用發生在同一類別的方法之間,或發生在非 Spring 管理的 Bean 的不同類別之間,則不會有代理,並且交易攔截器不會啟動。JVM 在運行時不知道代理和攔截器機制。當在方法上新增 @Transactional 註解時,Spring 會在幕後為該方法建立交易代理。當 Spring Cloud Stream 調用交易方法時,代理會攔截該調用,然後透過代理物件發生實際調用。

我們提供的自訂 KafkaTransactionManager Bean 有兩個目的。首先,它使 Spring Boot 應用 @EnableTransactionManagerment。它還提供 Binder 內部使用的相同生產者工廠,以便 Transactional 註解在應用交易時使用正確的資源。

當 Spring Boot 偵測到可用的交易管理器 Bean 時,它會自動為我們應用 @EnableTransactionManagement 註解,該註解負責偵測 @Transactional 註解,然後透過 Spring AOP 代理和通知機制新增攔截器。換句話說,Spring AOP 為 @Transactional 方法建立代理,並包含 AOP 通知。如果未應用 @EnableTransactionManagement 註解,Spring 就不會觸發任何這些代理和攔截機制。由於 EnableTransactionManagement 註解對於這些各種原因至關重要,因此我們必須提供交易管理器 Bean。否則,方法上的 Transactional 註解就沒有任何意義。

請注意,我們正在從 Binder 取得交易式生產者工廠,並在 KafkaTransactionManager 的建構子中使用它。當此 Bean 出現在應用程式中時,現在所有記錄的整個發布都發生在單一交易的範圍內。我們在追蹤記錄中只看到 beginTransaction…commitTransaction 的單一序列,這表示只有一個適當的交易執行所有發布操作。

在幕後,這些是事件順序

  1. 一旦調用使用 Transactional 註解的方法,交易攔截器就會透過 AOP 代理機制啟動,並使用自訂 KafkaTransactionManager 啟動新的交易。
  2. 當交易管理器開始交易時,交易管理器使用的資源 - 交易資源持有者(又名,從生產者工廠獲得的生產者)- 綁定到交易。
  3. 當方法調用 StreamBridge#send 方法時,底層 KafkaTemplate 將使用自訂 KafkaTransactionManager 建立的相同交易資源。由於交易已在進行中,因此它不會啟動另一個交易,而是發布發生在相同的交易生產者上。
  4. 隨著它調用更多 send 方法,它不會啟動新的交易。相反,它透過原始交易中使用的相同生產者資源發布。
  5. 當方法退出時,如果沒有錯誤,攔截器會要求交易管理器提交交易。如果任何發送操作或方法中的任何其他內容拋出異常,攔截器會要求交易管理器回滾交易。這些調用最終會命中 KafkaResourceHolder commitrollback 方法,這些方法調用 Kafka 生產者來提交或回滾交易。

由於在我們的範例中只有一個自訂 KafkaTransactionManager Bean,因此我們可以簡單地按原樣使用 Transactional 註解。另一方面,如果我們有多個自訂 KafkaTransactionManager Bean,我們必須使用正確的 Bean 名稱限定 @Transactional 註解。

如果我們在沒有自訂 KafkaTransactionManager 的情況下運行應用程式會怎樣?

如果我們移除自訂 KafkaTransactionManager 並運行此應用程式,您可以看到它建立五個個別交易,而不是單一交易。如果您啟用 TRACE 記錄,您可以在記錄中看到五個 beginTransaction…commitTransaction 序列。

您可以透過編寫交易式消費者 Spring Cloud Stream 應用程式並將其隔離層級設定為 read_committed 來驗證此行為。您可以使用 spring.cloud.stream.kafka.binder.configuration.isolation.level 屬性並將其值設定為 read_committed 來執行此操作。為了測試目的,新增 Thread.sleep 或其他等待機制,以模擬 for 迴圈中每次 StreamBridge#send 之後的行為。您可以看到,一旦每次 send 方法調用返回,無論等待時間長短,消費者都會收到資料,從而證明並非單一交易執行了整個操作,而是每次 send 都發生在自己的交易中。

我們看到每次發送的個別交易,因為 Transactional 註解沒有做我們期望它做的事情。Transactional 註解僅在有可用的交易管理器 Bean 且其生產者工廠與 Binder 使用的相同時才有效。

如果 Spring Boot 透過 spring.kafka.producer.transaction-id-prefix 偵測到組態中的 transaction-id-prefix 屬性,則會自動組態 KafkaTransactionManager。然而,由於我們處於 Spring Cloud Stream 情境中,因此我們必須使用 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix,因為這是我們向框架發送信號,以為 Binder 和關聯的交易式生產者工廠建立內部交易管理器的方式。如果我們提供適當的 spring.kafka 前綴,以便 Spring Boot 為我們自動組態 KakaTransactionManager 會怎樣?雖然這非常誘人,但它不起作用,因為自動組態的交易管理器使用的生產者工廠與 Binder 使用的不同。因此,我們必須提供一個自訂 KafkaTransactionManager,它使用與 Binder 相同的生產者工廠。這正是我們上面所做的。

在本部落格系列的下一部分中,我們將學習如何針對生產者和消費者啟動的交易與外部交易管理器同步。

取得 Spring 電子報

保持與 Spring 電子報的聯繫

訂閱

取得領先

VMware 提供培訓和認證,以加速您的進展。

了解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

了解更多

即將到來的活動

查看 Spring 社群中所有即將到來的活動。

查看所有