建立批次服務

本指南將引導您完成建立基本批次驅動解決方案的流程。

您將建置的內容

您將建置一個服務,該服務從 CSV 試算表匯入資料,使用自訂程式碼轉換資料,並將最終結果儲存在資料庫中。

您需要的東西

如何完成本指南

與大多數 Spring 開始使用指南 一樣,您可以從頭開始並完成每個步驟,或者您可以跳過您已熟悉的基本設定步驟。無論哪種方式,您最終都會得到可運作的程式碼。

若要從頭開始,請繼續前往 從 Spring Initializr 開始

若要跳過基本步驟,請執行以下操作

完成時,您可以對照 gs-batch-processing/complete 中的程式碼檢查您的結果。

從 Spring Initializr 開始

您可以使用這個 預先初始化的專案,然後按一下 [Generate] (產生) 以下載 ZIP 檔案。此專案已設定為符合本教學課程中的範例。

若要手動初始化專案

  1. 導覽至 https://start.spring.io。這項服務會提取應用程式所需的所有相依性,並為您完成大部分設定。

  2. 選擇 Gradle 或 Maven 以及您想要使用的語言。本指南假設您選擇了 Java。

  3. 按一下 Dependencies (相依性) 並選取 Spring BatchHyperSQL Database (HyperSQL 資料庫)。

  4. 按一下 Generate (產生)。

  5. 下載產生的 ZIP 檔案,這是一個已設定您所選選項的 Web 應用程式封存檔。

如果您的 IDE 具有 Spring Initializr 整合,您可以從您的 IDE 完成此程序。
您也可以從 Github 分支專案,並在您的 IDE 或其他編輯器中開啟它。

業務資料

通常,您的客戶或業務分析師會提供試算表。對於這個簡單的範例,您可以在 src/main/resources/sample-data.csv 中找到一些虛構的資料

Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe

此試算表的每一列都包含名字和姓氏,並以逗號分隔。這是 Spring 可以處理而無需自訂的相當常見的模式。

接下來,您需要編寫 SQL 指令碼來建立表格以儲存資料。您可以在 src/main/resources/schema-all.sql 中找到這類指令碼

DROP TABLE people IF EXISTS;

CREATE TABLE people  (
    person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
    first_name VARCHAR(20),
    last_name VARCHAR(20)
);
Spring Boot 在啟動期間會自動執行 schema-@@platform@@.sql-all 是所有平台的預設值。

建立業務類別

現在您可以看到資料輸入和輸出的格式,您可以編寫程式碼來表示資料列,如下列範例 (來自 src/main/java/com/example/batchprocessing/Person.java) 所示

package com.example.batchprocessing;

public record Person(String firstName, String lastName) {

}

您可以透過建構函式,使用名字和姓氏來實例化 Person 記錄。

建立中繼處理器

批次處理中的常見範例是擷取資料、轉換資料,然後將其導出到其他地方。在這裡,您需要編寫一個簡單的轉換器,將名稱轉換為大寫。以下清單 (來自 src/main/java/com/example/batchprocessing/PersonItemProcessor.java) 顯示了如何執行此操作

package com.example.batchprocessing;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.item.ItemProcessor;

public class PersonItemProcessor implements ItemProcessor<Person, Person> {

  private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);

  @Override
  public Person process(final Person person) {
    final String firstName = person.firstName().toUpperCase();
    final String lastName = person.lastName().toUpperCase();

    final Person transformedPerson = new Person(firstName, lastName);

    log.info("Converting (" + person + ") into (" + transformedPerson + ")");

    return transformedPerson;
  }

}

PersonItemProcessor 實作了 Spring Batch 的 ItemProcessor 介面。這使得將程式碼連接到您稍後將在本指南中定義的批次作業變得容易。根據介面,您會收到一個傳入的 Person 物件,然後將其轉換為大寫的 Person

輸入和輸出類型不必相同。事實上,在讀取一個資料來源後,有時應用程式的資料流程需要不同的資料類型。

組合批次作業

現在您需要組合實際的批次作業。Spring Batch 提供了許多實用程式類別,可減少編寫自訂程式碼的需求。相反地,您可以專注於業務邏輯。

若要設定您的作業,您必須先建立一個 Spring @Configuration 類別,例如 src/main/java/com/example/batchprocessing/BatchConfiguration.java 中的以下範例。此範例使用記憶體型資料庫,這表示完成後資料將會消失。現在將以下 Bean 新增至您的 BatchConfiguration 類別,以定義讀取器、處理器和寫入器

@Bean
public FlatFileItemReader<Person> reader() {
  return new FlatFileItemReaderBuilder<Person>()
    .name("personItemReader")
    .resource(new ClassPathResource("sample-data.csv"))
    .delimited()
    .names("firstName", "lastName")
    .targetType(Person.class)
    .build();
}

@Bean
public PersonItemProcessor processor() {
  return new PersonItemProcessor();
}

@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
  return new JdbcBatchItemWriterBuilder<Person>()
    .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
    .dataSource(dataSource)
    .beanMapped()
    .build();
}

第一個程式碼區塊定義了輸入、處理器和輸出。

  • reader() 建立一個 ItemReader。它會尋找名為 sample-data.csv 的檔案,並解析每一行項目,其中包含足夠的資訊以將其轉換為 Person

  • processor() 建立您先前定義的 PersonItemProcessor 的實例,旨在將資料轉換為大寫。

  • writer(DataSource) 建立一個 ItemWriter。這個寫入器旨在 JDBC 目的地,並自動取得 Spring Boot 建立的 dataSource 副本。它包含插入單個 Person 所需的 SQL 陳述式,由 Java 記錄元件驅動。

最後一個區塊 (來自 src/main/java/com/example/batchprocessing/BatchConfiguration.java) 顯示了實際的作業設定

@Bean
public Job importUserJob(JobRepository jobRepository,Step step1, JobCompletionNotificationListener listener) {
  return new JobBuilder("importUserJob", jobRepository)
    .listener(listener)
    .start(step1)
    .build();
}

@Bean
public Step step1(JobRepository jobRepository, DataSourceTransactionManager transactionManager,
          FlatFileItemReader<Person> reader, PersonItemProcessor processor, JdbcBatchItemWriter<Person> writer) {
  return new StepBuilder("step1", jobRepository)
    .<Person, Person> chunk(3, transactionManager)
    .reader(reader)
    .processor(processor)
    .writer(writer)
    .build();
}

第一個方法定義了作業,第二個方法定義了單個步驟。作業由步驟建構而成,其中每個步驟可以包含讀取器、處理器和寫入器。

然後您列出每個步驟 (儘管此作業只有一個步驟)。作業結束,而 Java API 會產生一個完美設定的作業。

在步驟定義中,您定義了每次寫入多少資料。在本例中,它一次最多寫入三筆記錄。接下來,您使用先前注入的 Bean 來設定讀取器、處理器和寫入器。

chunk() 的前綴為 <Person,Person>,因為它是一個泛型方法。這表示每個「區塊」處理的輸入和輸出類型,並與 ItemReader<Person>ItemWriter<Person> 對齊。

批次設定的最後一部分是一種在作業完成時收到通知的方式。以下範例 (來自 src/main/java/com/example/batchprocessing/JobCompletionNotificationListener.java) 顯示了這樣一個類別

package com.example.batchprocessing;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.jdbc.core.DataClassRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
public class JobCompletionNotificationListener implements JobExecutionListener {

  private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

  private final JdbcTemplate jdbcTemplate;

  public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
    this.jdbcTemplate = jdbcTemplate;
  }

  @Override
  public void afterJob(JobExecution jobExecution) {
    if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
      log.info("!!! JOB FINISHED! Time to verify the results");

      jdbcTemplate
          .query("SELECT first_name, last_name FROM people", new DataClassRowMapper<>(Person.class))
          .forEach(person -> log.info("Found <{{}}> in the database.", person));
    }
  }
}

JobCompletionNotificationListener 監聽作業何時為 BatchStatus.COMPLETED,然後使用 JdbcTemplate 來檢查結果。

讓應用程式可執行

雖然批次處理可以嵌入在 Web 應用程式和 WAR 檔案中,但下面示範的更簡單方法會建立獨立應用程式。您將所有內容封裝在單個可執行 JAR 檔案中,由良好的舊式 Java main() 方法驅動。

Spring Initializr 為您建立了一個應用程式類別。對於這個簡單的範例,它可以運作而無需進一步修改。以下清單 (來自 src/main/java/com/example/batchprocessing/BatchProcessingApplication.java) 顯示了應用程式類別

package com.example.batchprocessing;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BatchProcessingApplication {

  public static void main(String[] args) {
    System.exit(SpringApplication.exit(SpringApplication.run(BatchProcessingApplication.class, args)));
  }
}

@SpringBootApplication 是一個便利的註解,它新增了以下所有內容

  • @Configuration:將類別標記為應用程式內容的 Bean 定義來源。

  • @EnableAutoConfiguration:告知 Spring Boot 開始根據類別路徑設定、其他 Bean 和各種屬性設定來新增 Bean。例如,如果 spring-webmvc 在類別路徑上,則此註解會將應用程式標記為 Web 應用程式,並啟動關鍵行為,例如設定 DispatcherServlet

  • @ComponentScan:告知 Spring 在 com/example 套件中尋找其他元件、設定和服務,使其找到控制器。

main() 方法使用 Spring Boot 的 SpringApplication.run() 方法來啟動應用程式。您是否注意到沒有單行 XML?也沒有 web.xml 檔案。這個 Web 應用程式是 100% 純 Java,您不必處理設定任何管線或基礎結構。

請注意,SpringApplication.exit()System.exit() 可確保 JVM 在作業完成時結束。如需更多詳細資訊,請參閱 Spring Boot 參考文件中的應用程式結束章節

為了示範目的,程式碼會建立 JdbcTemplate、查詢資料庫,並列印出批次作業插入的人員姓名。

請注意應用程式如何不使用 @EnableBatchProcessing 註解。先前,@EnableBatchProcessing 可用於啟用 Spring Boot 的 Spring Batch 自動設定。從 Spring Boot v3.0 開始,不再需要此註解,應從想要使用 Spring Boot 自動設定的應用程式中移除。現在可以定義以 @EnableBatchProcessing 註解或擴充 Spring Batch 的 DefaultBatchConfiguration 的 Bean,以告知自動設定退回,允許應用程式完全控制 Spring Batch 的設定方式。

建置可執行 JAR

您可以使用 Gradle 或 Maven 從命令列執行應用程式。您也可以建置包含所有必要相依性、類別和資源的單個可執行 JAR 檔案,然後執行該檔案。建置可執行 JAR 檔案可以輕鬆地在整個開發生命週期、跨不同環境等情況下,將服務作為應用程式來交付、版本控制和部署。

如果您使用 Gradle,您可以使用 ./gradlew bootRun 來執行應用程式。或者,您可以使用 ./gradlew build 建置 JAR 檔案,然後如下所示執行 JAR 檔案

java -jar build/libs/gs-batch-processing-0.1.0.jar

如果您使用 Maven,您可以使用 ./mvnw spring-boot:run 來執行應用程式。或者,您可以使用 ./mvnw clean package 建置 JAR 檔案,然後如下所示執行 JAR 檔案

java -jar target/gs-batch-processing-0.1.0.jar
此處描述的步驟會建立可執行 JAR。您也可以 建置傳統 WAR 檔案

作業會為每個轉換的人員列印一行。在作業執行後,您也可以看到查詢資料庫的輸出。它應該類似於以下輸出

Converting (Person[firstName=Jill, lastName=Doe]) into (Person[firstName=JILL, lastName=DOE])
Converting (Person[firstName=Joe, lastName=Doe]) into (Person[firstName=JOE, lastName=DOE])
Converting (Person[firstName=Justin, lastName=Doe]) into (Person[firstName=JUSTIN, lastName=DOE])
Converting (Person[firstName=Jane, lastName=Doe]) into (Person[firstName=JANE, lastName=DOE])
Converting (Person[firstName=John, lastName=Doe]) into (Person[firstName=JOHN, lastName=DOE])
Found <{Person[firstName=JILL, lastName=DOE]}> in the database.
Found <{Person[firstName=JOE, lastName=DOE]}> in the database.
Found <{Person[firstName=JUSTIN, lastName=DOE]}> in the database.
Found <{Person[firstName=JANE, lastName=DOE]}> in the database.
Found <{Person[firstName=JOHN, lastName=DOE]}> in the database.

摘要

恭喜!您建置了一個批次作業,該作業從試算表擷取資料、處理資料,並將其寫入資料庫。

另請參閱

以下指南也可能有所幫助

想要編寫新的指南或貢獻現有的指南嗎?請查看我們的 貢獻指南

所有指南均以 ASLv2 授權發佈程式碼,並以 姓名標示-禁止改作創用 CC 授權 發佈寫作內容。

取得程式碼