如上圖,程式會讀取一個檔案 people.csv 的資料,經過處理寫入資料庫。people.csv 的內容如下:
中本聰,47,大和民族,波士頓
有五個欄位,除了第一個欄位 ID 是資料庫自行編碼的序號外,分別為 name、age、nation、address,用來記錄上面四個欄位的資料。現在開始看一下程式怎麼寫?
- build.gradle
buildscript { ext { springBootVersion = '2.0.2.RELEASE' } repositories { mavenCentral() jcenter() maven { url "https://repo.spring.io/libs-release" } maven { url "http://maven.springframework.org/milestone" } maven { url "http://repo.maven.apache.org/maven2" } maven { url "http://repo1.maven.org/maven2/" } maven { url "http://amateras.sourceforge.jp/mvn/" } } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") } } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'org.springframework.boot' apply plugin: 'io.spring.dependency-management' group = 'idv.steven.mybatch' version = '0.0.1-SNAPSHOT' sourceCompatibility = 1.8 repositories { mavenCentral() jcenter() maven { url "https://repo.spring.io/libs-release" } maven { url "http://maven.springframework.org/milestone" } maven { url "http://repo.maven.apache.org/maven2" } maven { url "http://repo1.maven.org/maven2/" } maven { url "http://amateras.sourceforge.jp/mvn/" } } configurations.all { //sping boot 預設使用logback,先移除 exclude group: 'org.springframework.boot', module: 'spring-boot-starter-logging' } dependencies { def log4j2Version = '2.7' compile('org.springframework.boot:spring-boot-starter-batch') compile('org.springframework.boot:spring-boot-starter-data-jpa') compileOnly('org.projectlombok:lombok') testCompile('org.springframework.boot:spring-boot-starter-test') testCompile('org.springframework.batch:spring-batch-test') compile fileTree(dir: 'libs', include: ['*.jar']) compile group: 'javax.validation', name: 'validation-api', version: '2.0.1.Final' compile group: 'org.hibernate', name: 'hibernate-validator', version: '5.3.6.Final' compile group: 'org.glassfish.web', name: 'el-impl', version: '2.2' compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: "${log4j2Version}" compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: "${log4j2Version}" compile group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: "${log4j2Version}"
}
- application.properties
spring.datasource.url=jdbc:oracle:thin:@192.168.51.168:1521:testdb spring.datasource.username=testuser spring.datasource.password=testpass
- spring boot application
@SpringBootApplication public class MyBatchApplication { public static void main(String[] args) throws Exception { SpringApplication.run(MyBatchApplication.class, args); } }
- Person
@Entity @Table(name="PERSON") @Data public class Person { @Id @GeneratedValue(strategy=GenerationType.SEQUENCE, generator="SEQ_PERSON") @SequenceGenerator(name="SEQ_PERSON", sequenceName="SEQ_PERSON", allocationSize=1) @Column(name="ID", nullable=false) private Long id; private String name; private Long age; private String nation; private String address; }
- PersonDao
public interface PersonDao extends CrudRepository<Person, Long> { }
- BatchConfig
@Configuration
@EnableBatchProcessing
public class BatchConfig {
}
- JobRepository (jobRepository)
- JobLauncher (jobLauncher)
- JobRegistry (jobRegistry)
- PlatformTransactionManager (transactionManager)
- JobBuilderFactory (jobBuilders)
- StepBuilderFactory (stepBuilders)
- BATCH_JOB_EXECUTION
- BATCH_JOB_EXECUTION_CONTEXT
- BATCH_JOB_EXECUTION_PARAMS
- BATCH_JOB_INSTANCE
- BATCH_STEP_EXECUTION
- BATCH_STEP_EXECUTION_CONTEXT
如果因為測試時不想寫出那麼多 log,或是基於各種考量,不希望 spring batch 記下這些 log,可以改成如下,使用預設的 spring batch 設定 (繼承 DefaultBatchConfigurer),但是在注入 DataSource 時刻意將它忽略,這樣 spring batch 就不會寫 log 到資料庫了。
@Configuration @EnableBatchProcessing public class BatchConfig extends DefaultBatchConfigurer { @Override public void setDataSource(DataSource dataSource) { // override to do not set datasource even if a datasource exist. // initialize will use a Map based JobRepository (instead of database) } }
- PersonJob
@Configuration
public class PersonJob {
@Autowired
private CsvJobListener listener;
@Bean
public Job importJob(JobBuilderFactory jobs, Step s1) {
return jobs.get("importJob")
.incrementer(new RunIdIncrementer())
.flow(s1)
.end()
.listener(listener)
.build();
}
@Bean
public Step step1(
StepBuilderFactory stepBuilderFactory,
ItemReader<Person> reader,
ItemWriter<Person> writer,
ItemProcessor<Person,Person> processor) {
return stepBuilderFactory
.get("step1")
.<Person, Person>chunk(500)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public FlatFileItemReader<Person> reader() throws Exception {
FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();
reader.setResource(new ClassPathResource("people.csv"));
reader.setLineMapper(
new DefaultLineMapper<Person>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[] { "name", "age", "nation", "address" });
}});
setFieldSetMapper(new PersonFieldSetMapper());
}});
return reader;
}
}
這裡定義了三個 bean,第一個是定義了一個 Job; 第二個 bean 是定義 Job 中的步驟,因為這個 Job 只有一個步驟,就只有一個 Step bean,其中 500 表示每 500 筆存一次,spring batch 不會每讀一筆就存一次,這樣太沒效率了; 第三個 bean 用來讀取 CSV 檔,可以看到欄位與的 Person 類別一致,當 spring batch 讀取一行 CSV 的資料,就會產生一個 Person object 來儲存。
- PersonItemWriter
@Component public class PersonItemWriter implements ItemWriter{ @Autowired private PersonDao daoPerson; @Override public void write(List items) throws Exception { items.forEach(i -> { daoPerson.save(i); }); } }
- PersonFieldSetMapper
@Component public class PersonFieldSetMapper implements FieldSetMapper{ @Override public Person mapFieldSet(FieldSet fieldSet) throws BindException { Person person = new Person(); person.setName(fieldSet.readString("name")); person.setAge(fieldSet.readLong("age")); person.setNation(fieldSet.readString("nation")); person.setAddress(fieldSet.readString("address")); return person; } }
- PersonItemProcessor
@Component
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
@Override
public Person process(Person item) throws ValidationException {
if (item.getNation().equals("大和民族")) {
item.setNation("01");
} else {
item.setNation("02");
}
return item;
}
}
- CsvJobListener
@Component
@Slf4j
public class CsvJobListener implements JobExecutionListener {
private long startTime;
private long endTime;
@Override
public void beforeJob(JobExecution jobExecution) {
startTime = System.currentTimeMillis();
log.info("任務處理開始");
}
@Override
public void afterJob(JobExecution jobExecution) {
endTime = System.currentTimeMillis();
log.info("任務處理結束");
log.info("耗時:" + (endTime - startTime) + "ms");
}
}
- 執行結果
執行後查詢一下資料庫,應該可以看到已經寫入了。