Google Code Prettify

2018年5月23日 星期三

Spring Batch: getting started

雖然很多系統都有批次處理的需求,但是,spring batch 似乎並沒有很流行? 這裡看一下怎麼開始寫第一支 spring batch 的程式。


如上圖,程式會讀取一個檔案 people.csv 的資料,經過處理寫入資料庫。people.csv 的內容如下:
中本聰,47,大和民族,波士頓
很簡單的一個 csv 檔,只有一筆資料,四個欄位,分別記載著「名字」、「年齡」、「民族」、「居住地」。資料庫的 table 如下:

有五個欄位,除了第一個欄位 ID 是資料庫自行編碼的序號外,分別為 name、age、nation、address,用來記錄上面四個欄位的資料。現在開始看一下程式怎麼寫?
  • build.gradle
buildscript {
    ext {
        springBootVersion = '2.0.2.RELEASE'
    }
    repositories {
      mavenCentral()
      jcenter()
      maven { url "https://repo.spring.io/libs-release" }
      maven { url "http://maven.springframework.org/milestone" }
      maven { url "http://repo.maven.apache.org/maven2" }
      maven { url "http://repo1.maven.org/maven2/" }
      maven { url "http://amateras.sourceforge.jp/mvn/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'idv.steven.mybatch'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
     mavenCentral()
     jcenter()
     maven { url "https://repo.spring.io/libs-release" }
     maven { url "http://maven.springframework.org/milestone" }
     maven { url "http://repo.maven.apache.org/maven2" }
     maven { url "http://repo1.maven.org/maven2/" }
     maven { url "http://amateras.sourceforge.jp/mvn/" }
}

configurations.all {
    //sping boot 預設使用logback,先移除
    exclude group: 'org.springframework.boot', module: 'spring-boot-starter-logging'
}

dependencies {
    def log4j2Version = '2.7'

    compile('org.springframework.boot:spring-boot-starter-batch')
    compile('org.springframework.boot:spring-boot-starter-data-jpa')
    compileOnly('org.projectlombok:lombok')
    testCompile('org.springframework.boot:spring-boot-starter-test')
    testCompile('org.springframework.batch:spring-batch-test')
 
    compile fileTree(dir: 'libs', include: ['*.jar'])
 
    compile group: 'javax.validation', name: 'validation-api', version: '2.0.1.Final'
    compile group: 'org.hibernate', name: 'hibernate-validator', version: '5.3.6.Final'
    compile group: 'org.glassfish.web', name: 'el-impl', version: '2.2'

    compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: "${log4j2Version}"
    compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: "${log4j2Version}"
    compile group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: "${log4j2Version}" 
}
我使用 Gradle 管理 jar 檔,所以需要如上檔案,最重要的是紅色那兩行,分別引入 spring batch 及 spring data,這樣程式可以進行批次處理及寫入資料庫了。
  • application.properties
spring.datasource.url=jdbc:oracle:thin:@192.168.51.168:1521:testdb
spring.datasource.username=testuser
spring.datasource.password=testpass
這是 spring boot 預設的設定檔,在裡面寫入如上內容,spring boot 就會幫我們做好所有資料庫相關初始化的工作。
  • spring boot application
@SpringBootApplication
public class MyBatchApplication {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(MyBatchApplication.class, args);
    }
}
spring boot 程式就行這裡開始,如果還不清楚,可以參考「第一個 spring boot web 程式 (使用 embedded tomcat)」。
  • Person
@Entity
@Table(name="PERSON")
@Data
public class Person {
    @Id
    @GeneratedValue(strategy=GenerationType.SEQUENCE, generator="SEQ_PERSON")
    @SequenceGenerator(name="SEQ_PERSON", sequenceName="SEQ_PERSON", allocationSize=1)
    @Column(name="ID", nullable=false)
    private Long id;
 
    private String name;
 
    private Long age;
 
    private String nation;
 
    private String address;
}
JPA 的 entity 定義如上,對應前面的 table PERSON。
  • PersonDao
public interface PersonDao extends CrudRepository<Person, Long> {

}
有 Entity 之後,自然要寫個 DAO 來存取,如上,這是 spring data 提供的簡便方法,如果不了解,可以參考「spring data: CrudRepository 自動生成」。
  • BatchConfig
@Configuration
@EnableBatchProcessing
public class BatchConfig {

}
只要在 @Configuration 所在類別加入 @EnableBatchProcessing,spring batch 會自動設定好基本的設定,並建立起以下幾個 bean: (括號中為 bean name)
  1. JobRepository (jobRepository)
  2. JobLauncher  (jobLauncher)
  3. JobRegistry (jobRegistry)
  4. PlatformTransactionManager  (transactionManager)
  5. JobBuilderFactory (jobBuilders)
  6. StepBuilderFactory (stepBuilders)
並且會將執行過程及結果寫入底下六個 tables:
  1. BATCH_JOB_EXECUTION
  2. BATCH_JOB_EXECUTION_CONTEXT
  3. BATCH_JOB_EXECUTION_PARAMS
  4. BATCH_JOB_INSTANCE
  5. BATCH_STEP_EXECUTION
  6. BATCH_STEP_EXECUTION_CONTEXT
如果因為測試時不想寫出那麼多 log,或是基於各種考量,不希望 spring batch 記下這些 log,可以改成如下,使用預設的 spring batch 設定 (繼承 DefaultBatchConfigurer),但是在注入 DataSource 時刻意將它忽略,這樣 spring batch 就不會寫 log 到資料庫了。
@Configuration
@EnableBatchProcessing
public class BatchConfig extends DefaultBatchConfigurer {
    @Override
    public void setDataSource(DataSource dataSource) {
        // override to do not set datasource even if a datasource exist.
        // initialize will use a Map based JobRepository (instead of database)
    }
}
  • PersonJob
@Configuration
public class PersonJob {
    @Autowired
    private CsvJobListener listener;
 
    @Bean
    public Job importJob(JobBuilderFactory jobs, Step s1) {
        return jobs.get("importJob")
          .incrementer(new RunIdIncrementer())
          .flow(s1)
          .end()
          .listener(listener)
          .build();
    }
    
    @Bean
    public Step step1(
        StepBuilderFactory stepBuilderFactory, 
        ItemReader<Person> reader, 
        ItemWriter<Person> writer,
        ItemProcessor<Person,Person> processor) {

        return stepBuilderFactory
          .get("step1")
          .<Person, Person>chunk(500)
          .reader(reader)
          .processor(processor)
          .writer(writer)
          .build();
    }
    
    @Bean
    public FlatFileItemReader<Person> reader() throws Exception {
        FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();
        reader.setResource(new ClassPathResource("people.csv"));
        reader.setLineMapper(
            new DefaultLineMapper<Person>() {{
                setLineTokenizer(new DelimitedLineTokenizer() {{
                    setNames(new String[] { "name", "age", "nation", "address" });
                }});
        
                setFieldSetMapper(new PersonFieldSetMapper());
            }});

        return reader;
    }
}

這裡定義了三個 bean,第一個是定義了一個 Job; 第二個 bean 是定義 Job 中的步驟,因為這個 Job 只有一個步驟,就只有一個 Step bean,其中 500 表示每 500 筆存一次,spring batch 不會每讀一筆就存一次,這樣太沒效率了; 第三個 bean 用來讀取 CSV 檔,可以看到欄位與的 Person 類別一致,當 spring batch 讀取一行 CSV 的資料,就會產生一個 Person object 來儲存。
  • PersonItemWriter
@Component
public class PersonItemWriter implements ItemWriter {
    @Autowired
    private PersonDao daoPerson;

    @Override
    public void write(List items) throws Exception {
        items.forEach(i -> {
            daoPerson.save(i);
        }); 
    }
}
寫入資料庫,我簡單的用 spring data 提供的 DAO 寫入。
  • PersonFieldSetMapper
@Component
public class PersonFieldSetMapper implements FieldSetMapper {

    @Override
    public Person mapFieldSet(FieldSet fieldSet) throws BindException {
        Person person = new Person();
        person.setName(fieldSet.readString("name"));
        person.setAge(fieldSet.readLong("age"));
        person.setNation(fieldSet.readString("nation"));
        person.setAddress(fieldSet.readString("address"));

        return person;
    }
}
由 reader 讀入的資料在這裡轉換成 Person object。
  • PersonItemProcessor
@Component
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
    @Override
    public Person process(Person item) throws ValidationException {
        if (item.getNation().equals("大和民族")) {
            item.setNation("01");
        } else {
            item.setNation("02");
        }
        return item;
    }
}
由 csv 讀入資料後,會傳到這個類別,如果要做一些處理,可以在這裡進行,要特別注意傳回值如果是  null,則 spring batch 不會呼叫 writer,這表示在這裡我們可以篩選進來的資料,不符合需求的就不寫入。要特別注意的是,spring batch 在 ItemProcessor 也會計算次數,看看 ItemReader 送來的資料筆數是否已經等於前面設定的 chunk 筆數,等於了才會送給 ItemWriter,流程會像這樣 …
  • CsvJobListener
@Component
@Slf4j
public class CsvJobListener implements JobExecutionListener {
    private long startTime;
    private long endTime;

    @Override
    public void beforeJob(JobExecution jobExecution) {
        startTime = System.currentTimeMillis();
        log.info("任務處理開始");
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        endTime = System.currentTimeMillis();
        log.info("任務處理結束");
        log.info("耗時:" + (endTime - startTime) + "ms");
    }
}
Listener 並非必要的,這裡用它來寫一些 log,好方便我們觀察。
  • 執行結果


執行後查詢一下資料庫,應該可以看到已經寫入了。

5 則留言:

  1. 有個疑問
    在BatchConfig 裡你將setDataSource的方法override
    但是卻沒有設定spring 注入的datasource
    這樣會造成無法取得資料庫連線吧?
    也就是repository實際上沒有真的save entity

    回覆刪除
    回覆
    1. writer那邊有personDao.save(),後面繼承了CrudRepository。
      基本的CRUD由spring處理掉了。

      刪除
  2. 請問一下最後一段執行指的是 spring boot application? spring batch 官網提供兩種執行方式, 一個是 commandline 另一個是 http request, 好奇你這邊是怎麼執行 job 的?

    回覆刪除
    回覆
    1. @SpringBootApplication

      command line

      刪除
    2. 查到原因了給你參考, 主因是 spring boot application 在啟動後, @EnableAutoConfiguration 在讀 spring batch 時 default 就會先執行一次 configured job, 若後續要在 trigger job 還是要配置 commandline 或 http request 接口

      刪除