Google Code Prettify

2018年5月23日 星期三

Spring Batch: getting started

雖然很多系統都有批次處理的需求,但是,spring batch 似乎並沒有很流行? 這裡看一下怎麼開始寫第一支 spring batch 的程式。


如上圖,程式會讀取一個檔案 people.csv 的資料,經過處理寫入資料庫。people.csv 的內容如下:
中本聰,47,大和民族,波士頓
很簡單的一個 csv 檔,只有一筆資料,四個欄位,分別記載著「名字」、「年齡」、「民族」、「居住地」。資料庫的 table 如下:

有五個欄位,除了第一個欄位 ID 是資料庫自行編碼的序號外,分別為 name、age、nation、address,用來記錄上面四個欄位的資料。現在開始看一下程式怎麼寫?
  • build.gradle
buildscript {
    ext {
        springBootVersion = '2.0.2.RELEASE'
    }
    repositories {
      mavenCentral()
      jcenter()
      maven { url "https://repo.spring.io/libs-release" }
      maven { url "http://maven.springframework.org/milestone" }
      maven { url "http://repo.maven.apache.org/maven2" }
      maven { url "http://repo1.maven.org/maven2/" }
      maven { url "http://amateras.sourceforge.jp/mvn/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'idv.steven.mybatch'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
     mavenCentral()
     jcenter()
     maven { url "https://repo.spring.io/libs-release" }
     maven { url "http://maven.springframework.org/milestone" }
     maven { url "http://repo.maven.apache.org/maven2" }
     maven { url "http://repo1.maven.org/maven2/" }
     maven { url "http://amateras.sourceforge.jp/mvn/" }
}

configurations.all {
    //sping boot 預設使用logback,先移除
    exclude group: 'org.springframework.boot', module: 'spring-boot-starter-logging'
}

dependencies {
    def log4j2Version = '2.7'

    compile('org.springframework.boot:spring-boot-starter-batch')
    compile('org.springframework.boot:spring-boot-starter-data-jpa')
    compileOnly('org.projectlombok:lombok')
    testCompile('org.springframework.boot:spring-boot-starter-test')
    testCompile('org.springframework.batch:spring-batch-test')
 
    compile fileTree(dir: 'libs', include: ['*.jar'])
 
    compile group: 'javax.validation', name: 'validation-api', version: '2.0.1.Final'
    compile group: 'org.hibernate', name: 'hibernate-validator', version: '5.3.6.Final'
    compile group: 'org.glassfish.web', name: 'el-impl', version: '2.2'

    compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: "${log4j2Version}"
    compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: "${log4j2Version}"
    compile group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: "${log4j2Version}" 
}
我使用 Gradle 管理 jar 檔,所以需要如上檔案,最重要的是紅色那兩行,分別引入 spring batch 及 spring data,這樣程式可以進行批次處理及寫入資料庫了。
  • application.properties
spring.datasource.url=jdbc:oracle:thin:@192.168.51.168:1521:testdb
spring.datasource.username=testuser
spring.datasource.password=testpass
這是 spring boot 預設的設定檔,在裡面寫入如上內容,spring boot 就會幫我們做好所有資料庫相關初始化的工作。
  • spring boot application
@SpringBootApplication
public class MyBatchApplication {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(MyBatchApplication.class, args);
    }
}
spring boot 程式就行這裡開始,如果還不清楚,可以參考「第一個 spring boot web 程式 (使用 embedded tomcat)」。
  • Person
@Entity
@Table(name="PERSON")
@Data
public class Person {
    @Id
    @GeneratedValue(strategy=GenerationType.SEQUENCE, generator="SEQ_PERSON")
    @SequenceGenerator(name="SEQ_PERSON", sequenceName="SEQ_PERSON", allocationSize=1)
    @Column(name="ID", nullable=false)
    private Long id;
 
    private String name;
 
    private Long age;
 
    private String nation;
 
    private String address;
}
JPA 的 entity 定義如上,對應前面的 table PERSON。
  • PersonDao
public interface PersonDao extends CrudRepository<Person, Long> {

}
有 Entity 之後,自然要寫個 DAO 來存取,如上,這是 spring data 提供的簡便方法,如果不了解,可以參考「spring data: CrudRepository 自動生成」。
  • BatchConfig
@Configuration
@EnableBatchProcessing
public class BatchConfig {

}
只要在 @Configuration 所在類別加入 @EnableBatchProcessing,spring batch 會自動設定好基本的設定,並建立起以下幾個 bean: (括號中為 bean name)
  1. JobRepository (jobRepository)
  2. JobLauncher  (jobLauncher)
  3. JobRegistry (jobRegistry)
  4. PlatformTransactionManager  (transactionManager)
  5. JobBuilderFactory (jobBuilders)
  6. StepBuilderFactory (stepBuilders)
並且會將執行過程及結果寫入底下六個 tables:
  1. BATCH_JOB_EXECUTION
  2. BATCH_JOB_EXECUTION_CONTEXT
  3. BATCH_JOB_EXECUTION_PARAMS
  4. BATCH_JOB_INSTANCE
  5. BATCH_STEP_EXECUTION
  6. BATCH_STEP_EXECUTION_CONTEXT
如果因為測試時不想寫出那麼多 log,或是基於各種考量,不希望 spring batch 記下這些 log,可以改成如下,使用預設的 spring batch 設定 (繼承 DefaultBatchConfigurer),但是在注入 DataSource 時刻意將它忽略,這樣 spring batch 就不會寫 log 到資料庫了。
@Configuration
@EnableBatchProcessing
public class BatchConfig extends DefaultBatchConfigurer {
    @Override
    public void setDataSource(DataSource dataSource) {
        // override to do not set datasource even if a datasource exist.
        // initialize will use a Map based JobRepository (instead of database)
    }
}
  • PersonJob
@Configuration
public class PersonJob {
    @Autowired
    private CsvJobListener listener;
 
    @Bean
    public Job importJob(JobBuilderFactory jobs, Step s1) {
        return jobs.get("importJob")
          .incrementer(new RunIdIncrementer())
          .flow(s1)
          .end()
          .listener(listener)
          .build();
    }
    
    @Bean
    public Step step1(
        StepBuilderFactory stepBuilderFactory, 
        ItemReader<Person> reader, 
        ItemWriter<Person> writer,
        ItemProcessor<Person,Person> processor) {

        return stepBuilderFactory
          .get("step1")
          .<Person, Person>chunk(500)
          .reader(reader)
          .processor(processor)
          .writer(writer)
          .build();
    }
    
    @Bean
    public FlatFileItemReader<Person> reader() throws Exception {
        FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();
        reader.setResource(new ClassPathResource("people.csv"));
        reader.setLineMapper(
            new DefaultLineMapper<Person>() {{
                setLineTokenizer(new DelimitedLineTokenizer() {{
                    setNames(new String[] { "name", "age", "nation", "address" });
                }});
        
                setFieldSetMapper(new PersonFieldSetMapper());
            }});

        return reader;
    }
}

這裡定義了三個 bean,第一個是定義了一個 Job; 第二個 bean 是定義 Job 中的步驟,因為這個 Job 只有一個步驟,就只有一個 Step bean,其中 500 表示每 500 筆存一次,spring batch 不會每讀一筆就存一次,這樣太沒效率了; 第三個 bean 用來讀取 CSV 檔,可以看到欄位與的 Person 類別一致,當 spring batch 讀取一行 CSV 的資料,就會產生一個 Person object 來儲存。
  • PersonItemWriter
@Component
public class PersonItemWriter implements ItemWriter {
    @Autowired
    private PersonDao daoPerson;

    @Override
    public void write(List items) throws Exception {
        items.forEach(i -> {
            daoPerson.save(i);
        }); 
    }
}
寫入資料庫,我簡單的用 spring data 提供的 DAO 寫入。
  • PersonFieldSetMapper
@Component
public class PersonFieldSetMapper implements FieldSetMapper {

    @Override
    public Person mapFieldSet(FieldSet fieldSet) throws BindException {
        Person person = new Person();
        person.setName(fieldSet.readString("name"));
        person.setAge(fieldSet.readLong("age"));
        person.setNation(fieldSet.readString("nation"));
        person.setAddress(fieldSet.readString("address"));

        return person;
    }
}
由 reader 讀入的資料在這裡轉換成 Person object。
  • PersonItemProcessor
@Component
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
    @Override
    public Person process(Person item) throws ValidationException {
        if (item.getNation().equals("大和民族")) {
            item.setNation("01");
        } else {
            item.setNation("02");
        }
        return item;
    }
}
由 csv 讀入資料後,會傳到這個類別,如果要做一些處理,可以在這裡進行,要特別注意傳回值如果是  null,則 spring batch 不會呼叫 writer,這表示在這裡我們可以篩選進來的資料,不符合需求的就不寫入。要特別注意的是,spring batch 在 ItemProcessor 也會計算次數,看看 ItemReader 送來的資料筆數是否已經等於前面設定的 chunk 筆數,等於了才會送給 ItemWriter,流程會像這樣 …
  • CsvJobListener
@Component
@Slf4j
public class CsvJobListener implements JobExecutionListener {
    private long startTime;
    private long endTime;

    @Override
    public void beforeJob(JobExecution jobExecution) {
        startTime = System.currentTimeMillis();
        log.info("任務處理開始");
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        endTime = System.currentTimeMillis();
        log.info("任務處理結束");
        log.info("耗時:" + (endTime - startTime) + "ms");
    }
}
Listener 並非必要的,這裡用它來寫一些 log,好方便我們觀察。
  • 執行結果


執行後查詢一下資料庫,應該可以看到已經寫入了。