Google Code Prettify

顯示具有 spring batch 標籤的文章。 顯示所有文章
顯示具有 spring batch 標籤的文章。 顯示所有文章

2019年1月10日 星期四

Spring Batch: multiple datasource

在 spring boot 環境中,使用 spring batch,如果需要連線兩個不同資料庫,要怎麼做呢?
  • 在 application.properties 定義多個 dataSource
batch.datasource.driver-class-name=oracle.jdbc.OracleDriver
batch.datasource.url=jdbc:oracle:thin:@192.168.50.12:1521:testDB1
batch.datasource.username=dbuser1
batch.datasource.password=p@ssword

db1.datasource.driver-class-name=oracle.jdbc.OracleDriver
db1.datasource.url=jdbc:oracle:thin:@192.168.50.12:1521:testDB1
db1.datasource.username=dbuser1
db1.datasource.password=p@ssword

db2.datasource.driver-class-name=oracle.jdbc.OracleDriver
db2.datasource.url=jdbc:oracle:thin:@192.168.51.168:1521:testDB2
db2.datasource.username=dbuser2
db2.datasource.password=p@ssword
如上,db1、db2 分別連到不同的兩個資料庫,但是,除此之外,還要設定一個給 spring batch 使用,所以有 batch (第一個) 的 dataSource 設定,雖然 spring batch 與 db1 使用同一個資料庫,還是要另外設定 dataSource,否則在執行到 spring batch 的 tasklet 時,如果裡面有存取到 db1、db2 的資料庫時,會抓不到 transaction。
  • 設定多個 dataSource
@Configuration
public class MultipleDataSourceConfig {
    @Autowired
    private Environment env;
 
    @Bean(name = "batchDatasource")
    @ConfigurationProperties(prefix = "batch.datasource")
    public DataSource batchDataSource() {
        return DataSourceBuilder.create()
           .driverClassName(env.getProperty("batch.datasource.driver-class-name"))
           .url(env.getProperty("batch.datasource.url"))
           .username(env.getProperty("batch.datasource.username"))
           .password(env.getProperty("batch.datasource.password"))
           .build();
    }

    @Bean(name = "db1Datasource")
    @Primary
    @ConfigurationProperties(prefix = "db1.datasource")
    public DataSource db1DataSource() {
        return DataSourceBuilder.create()
           .driverClassName(env.getProperty("db1.datasource.driver-class-name"))
           .url(env.getProperty("db1.datasource.url"))
           .username(env.getProperty("db1.datasource.username"))
           .password(env.getProperty("db1.datasource.password"))
           .build();
    }
 
    @Bean(name = "db2Datasource")
    @ConfigurationProperties(prefix = "db2.datasource")
    public DataSource db2DataSource() {
        return DataSourceBuilder.create()
           .driverClassName(env.getProperty("db2.datasource.driver-class-name"))
           .url(env.getProperty("db2.datasource.url"))
           .username(env.getProperty("db2.datasource.username"))
           .password(env.getProperty("db2.datasource.password"))
           .build();
    }
}
  • 為每個 dataSource 設定相關的 transactionManager
    • BatchDbConfig.java
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
   entityManagerFactoryRef = "batchEntityManagerFactory",
   basePackages = { "idv.steven.batch.dao" },
   transactionManagerRef = "batchTransactionManager"
)
public class BatchDbConfig {
 @Autowired
 @Qualifier("batchDatasource")
 private DataSource datasource;
 
    private JpaVendorAdapter jpaVendorAdapter() {
        HibernateJpaVendorAdapter adapter = new HibernateJpaVendorAdapter();

        adapter.setDatabase(Database.ORACLE);
        adapter.setShowSql(true);
        adapter.setGenerateDdl(false);
        adapter.setDatabasePlatform("org.hibernate.dialect.Oracle12cDialect");

        return adapter;
    }
    
    /**
     * 載入 Entity
     * @return
     */
    @Bean
    public LocalContainerEntityManagerFactoryBean batchEntityManagerFactory() {
        LocalContainerEntityManagerFactoryBean emf = new LocalContainerEntityManagerFactoryBean();
        emf.setDataSource(datasource);
        emf.setPackagesToScan(new String[] { "idv.steven.batch.entity" });
        emf.setJpaVendorAdapter(this.jpaVendorAdapter());
        emf.setSharedCacheMode(SharedCacheMode.NONE);

        return emf;
    }
    
    @Bean(name = "batchTransactionManager")
    public PlatformTransactionManager batchTransactionManager() {
        JpaTransactionManager tm = new JpaTransactionManager();
        tm.setEntityManagerFactory(this.batchEntityManagerFactory().getObject());
        
        return tm;
    }
}
    • DB1DbConfig.java
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
   entityManagerFactoryRef = "db1EntityManagerFactory",
   basePackages = { "idv.steven.database.db1.dao" },
   transactionManagerRef = "db1TransactionManager"
)
public class DB1DbConfig {
 @Autowired
 @Qualifier("db1Datasource")
 private DataSource datasource;
 
    private JpaVendorAdapter jpaVendorAdapter() {
        HibernateJpaVendorAdapter adapter = new HibernateJpaVendorAdapter();

        adapter.setDatabase(Database.ORACLE);
        adapter.setShowSql(true);
        adapter.setGenerateDdl(false);
        adapter.setDatabasePlatform("org.hibernate.dialect.Oracle12cDialect");

        return adapter;
    }
    
    /**
     * 載入 Entity
     * @return
     */
    @Primary
    @Bean
    public LocalContainerEntityManagerFactoryBean db1EntityManagerFactory() {
        LocalContainerEntityManagerFactoryBean emf = new LocalContainerEntityManagerFactoryBean();
        emf.setDataSource(datasource);
        emf.setPackagesToScan(new String[] { "idv.steven.database.db1.entity" });
        emf.setJpaVendorAdapter(this.jpaVendorAdapter());
        emf.setSharedCacheMode(SharedCacheMode.NONE);

        return emf;
    }
    
    @Primary
    @Bean(name = "db1TransactionManager")
    public PlatformTransactionManager db1TransactionManager() {
        JpaTransactionManager tm = new JpaTransactionManager();
        tm.setEntityManagerFactory(this.db1EntityManagerFactory().getObject());
        
        return tm;
    }
}
    • DB2DbConfig.java
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
   entityManagerFactoryRef = "db2EntityManagerFactory",
   basePackages = { "idv.steven.database.db2.dao" },
   transactionManagerRef = "db2TransactionManager"
)
public class DB2DbConfig {
 @Autowired
 @Qualifier("db2Datasource")
 private DataSource datasource;
 
    private JpaVendorAdapter jpaVendorAdapter() {
        HibernateJpaVendorAdapter adapter = new HibernateJpaVendorAdapter();

        adapter.setDatabase(Database.ORACLE);
        adapter.setShowSql(true);
        adapter.setGenerateDdl(false);
        adapter.setDatabasePlatform("org.hibernate.dialect.Oracle12cDialect");

        return adapter;
    }
    
    /**
     * 載入 Entity
     * @return
     */
    @Bean
    public LocalContainerEntityManagerFactoryBean db2EntityManagerFactory() {
        LocalContainerEntityManagerFactoryBean emf = new LocalContainerEntityManagerFactoryBean();
        emf.setDataSource(datasource);
        emf.setPackagesToScan(new String[] { "idv.steven.database.db2.entity" });
        emf.setJpaVendorAdapter(this.jpaVendorAdapter());
        emf.setSharedCacheMode(SharedCacheMode.NONE);

        return emf;
    }
    
    @Bean(name = "db2TransactionManager")
    public PlatformTransactionManager db2TransactionManager() {
        JpaTransactionManager tm = new JpaTransactionManager();
        tm.setEntityManagerFactory(this.db2EntityManagerFactory().getObject());
        
        return tm;
    }
}
三個 transaction manager 需有一個設定為 primary,萬一有任何的 dao 沒有指明用那一個,系統才知道預設是那個。
  • 指定 spring batch 使用那一個 dataSource
@Configuration
@EnableBatchProcessing
public class BatchConfig {
    @Bean
    BatchConfigurer configurer(@Qualifier("batchDatasource") DataSource dataSource){
  return new DefaultBatchConfigurer(dataSource);
    }
}

在有 @EnableBatchProcessing 的類別裡,建立一個 BatchConfigurer 的 bean,指定 spring batch 要使用的 dataSource。

2018年12月7日 星期五

Spring Batch: FixedLengthTokenizer

在 2014 年我寫的一篇「剖析固定長度欄位的訊息字串」說明怎麼方便的剖析一個固定欄位長度的檔案,這一篇打算改用 Spring Batch,在往下看之前,建議回頭看一下這兩篇:
  1. 剖析固定長度欄位的訊息字串
  2. Spring Batch: getting started
在說明 Spring Batch 怎麼處理固定長度欄位檔案前,先看一下 Spring Batch 怎麼處理 csv 檔。如下是 csv 檔的內容:
Buterin,24,Anglo-Saxon,Canada
中本聰,47,大和民族,波士頓
只有兩筆資料 … 程式如下: (Person 等相關類別請參考 Spring Batch: getting started)
FlatFileItemReader<Person> itemReader = new FlatFileItemReader<Person>();
itemReader.setResource(new ClassPathResource("Person.csv"));
 
DefaultLineMapper<Person> lineMapper = new DefaultLineMapper<Person>();
lineMapper.setLineTokenizer(new DelimitedLineTokenizer(){{
    setNames(new String[] { "name", "age", "nation", "address" });
}});
lineMapper.setFieldSetMapper(new PersonFieldSetMapper());
itemReader.setLineMapper(lineMapper);
  
itemReader.open(new ExecutionContext());
  
Person person = null;
while ((person = itemReader.read()) != null) {
    System.out.println(person.toString());
}
說明如下:
  • FlatFileItemReader: 這個類別可用來讀取文字檔,當然,csv 檔是文字檔的一種,也用來讀取 csv 檔。它主要依賴兩類別 -- Resource 及 LineMapper,前者為 spring 提供的基礎類別,可以存取檔案或網路資源,這裡使用的 ClassPathResource 會到 classpath 目錄下讀取指定的檔案。
  • DefaultLineMapper: spring batch 定義了 LineMapper 介面,並實作多個類別,這些類別是用來將 String 轉換成相對應的 Object,DefaultLineMapper 可用來處理有分隔符號或固定長度欄位的字串。
  • DelimitedLineTokenizer: 當要處理的字串為有分隔符號的,就用這個類別,這裡有使用 setNames 傳入欄位名稱,這是方便在 PersonFieldSetMapper  (前一篇) 中使用欄位名稱存取各欄位的值,沒有設定欄位名稱,可以用 index,從 0 開始。
現在改成處理固定長度欄位檔案,檔案不再用逗點分隔欄位,改成如下:
Buterin   24Anglo-Saxon       Canada    
中本聰    47大和民族          波士頓    
程式碼幾乎不用改,差別只有一個,就是將 tokenizer 改成 FixedLengthTokenizer !! 程式如下。
FlatFileItemReader itemReader = new FlatFileItemReader();
itemReader.setResource(new ClassPathResource("Person.txt"));
  
DefaultLineMapper lineMapper = new DefaultLineMapper();
FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();
tokenizer.setNames(new String[] { "name", "age", "nation", "address" });
  
Range range1 = new Range(1, 10);
Range range2 = new Range(11, 12);
Range range3 = new Range(13, 30);
Range range4 = new Range(31, 40);
tokenizer.setColumns(new Range[] { range1, range2, range3, range4 });
  
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new PersonFieldSetMapper());
itemReader.setLineMapper(lineMapper);
  
itemReader.open(new ExecutionContext());
  
Person person = null;
while ((person = itemReader.read()) != null) {
    System.out.println(person.toString());
}
如上,除了 tokenizer 改成 FixedLengthTokenizer,要設定每個欄位的開始位置、結束位置,開始位置從 1 開始。執行的結果是,讀第一行時沒問題,第二行就出現如下 exception 了!
org.springframework.batch.item.file.FlatFileParseException: Parsing error at line: 2 in resource=[class path resource [Person.txt]], input=[中本聰    47大和民族          波士頓    ]
…
Caused by: org.springframework.batch.item.file.transform.IncorrectLineLengthException: Line is shorter than max range 40
因為 Java 預設的編碼為 UTF-8,我的檔案編碼是 MS950,在切 token 時,spring batch 會檢查字串長度,發現長度不足最長的 40,就拋出 exception 了。這時候可以改寫 FixedLengthTokenizer,這裡寫了一個命名為 ZhFixedLengthTokenizer 的類別。
@Slf4j
public class ZhFixedLengthTokenizer extends FixedLengthTokenizer {
    private Range[] ranges;
    private int maxRange = 0;
    boolean open = false;
 
    public void setColumns(Range[] columns) {
        this.ranges = columns;
    }
 
    @Override
    public List<String> doTokenize(String line) {
        List<String> tokens = new ArrayList<String>(ranges.length);
        String token;

        try {
            byte[] b = line.getBytes("MS950");
            int lineLength = b.length;

            for (int i = 0; i < ranges.length; i++) {
                int startPos = ranges[i].getMin() - 1;
                int endPos = ranges[i].getMax();

                if (lineLength >= endPos) {
                    token = getZhString(b, startPos, endPos);
                }
                else if (lineLength >= startPos) {
                    token = getZhString(b, startPos, lineLength);
                }
                else {
                    token = "";
                }

                tokens.add(token);
            }
        } catch (UnsupportedEncodingException e) {
            log.error(e.getMessage(), e);
        }

        return tokens;
    }

    private String getZhString(byte[] b, int startPos, int endPos) throws UnsupportedEncodingException {
        String token;
        byte[] subB = Arrays.copyOfRange(b, startPos, endPos);
        token = new String(subB, "MS950");
        return token;
    }
}

這個類別繼承了 FixedLengthTokenizer,然後覆寫其中的 doTokenize,把切 token 時的字串編碼改為 MS950,這樣就可以得到正確結果了! 這些程式碼基本上是從原本的 FixedLengthTokenizer 裡 copy 過來改寫的,然後把 tokenizer 改用這個類別就可以了,如下:
FixedLengthTokenizer tokenizer = new ZhFixedLengthTokenizer();

2018年5月23日 星期三

Spring Batch: getting started

雖然很多系統都有批次處理的需求,但是,spring batch 似乎並沒有很流行? 這裡看一下怎麼開始寫第一支 spring batch 的程式。


如上圖,程式會讀取一個檔案 people.csv 的資料,經過處理寫入資料庫。people.csv 的內容如下:
中本聰,47,大和民族,波士頓
很簡單的一個 csv 檔,只有一筆資料,四個欄位,分別記載著「名字」、「年齡」、「民族」、「居住地」。資料庫的 table 如下:

有五個欄位,除了第一個欄位 ID 是資料庫自行編碼的序號外,分別為 name、age、nation、address,用來記錄上面四個欄位的資料。現在開始看一下程式怎麼寫?
  • build.gradle
buildscript {
    ext {
        springBootVersion = '2.0.2.RELEASE'
    }
    repositories {
      mavenCentral()
      jcenter()
      maven { url "https://repo.spring.io/libs-release" }
      maven { url "http://maven.springframework.org/milestone" }
      maven { url "http://repo.maven.apache.org/maven2" }
      maven { url "http://repo1.maven.org/maven2/" }
      maven { url "http://amateras.sourceforge.jp/mvn/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'idv.steven.mybatch'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
     mavenCentral()
     jcenter()
     maven { url "https://repo.spring.io/libs-release" }
     maven { url "http://maven.springframework.org/milestone" }
     maven { url "http://repo.maven.apache.org/maven2" }
     maven { url "http://repo1.maven.org/maven2/" }
     maven { url "http://amateras.sourceforge.jp/mvn/" }
}

configurations.all {
    //sping boot 預設使用logback,先移除
    exclude group: 'org.springframework.boot', module: 'spring-boot-starter-logging'
}

dependencies {
    def log4j2Version = '2.7'

    compile('org.springframework.boot:spring-boot-starter-batch')
    compile('org.springframework.boot:spring-boot-starter-data-jpa')
    compileOnly('org.projectlombok:lombok')
    testCompile('org.springframework.boot:spring-boot-starter-test')
    testCompile('org.springframework.batch:spring-batch-test')
 
    compile fileTree(dir: 'libs', include: ['*.jar'])
 
    compile group: 'javax.validation', name: 'validation-api', version: '2.0.1.Final'
    compile group: 'org.hibernate', name: 'hibernate-validator', version: '5.3.6.Final'
    compile group: 'org.glassfish.web', name: 'el-impl', version: '2.2'

    compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: "${log4j2Version}"
    compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: "${log4j2Version}"
    compile group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: "${log4j2Version}" 
}
我使用 Gradle 管理 jar 檔,所以需要如上檔案,最重要的是紅色那兩行,分別引入 spring batch 及 spring data,這樣程式可以進行批次處理及寫入資料庫了。
  • application.properties
spring.datasource.url=jdbc:oracle:thin:@192.168.51.168:1521:testdb
spring.datasource.username=testuser
spring.datasource.password=testpass
這是 spring boot 預設的設定檔,在裡面寫入如上內容,spring boot 就會幫我們做好所有資料庫相關初始化的工作。
  • spring boot application
@SpringBootApplication
public class MyBatchApplication {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(MyBatchApplication.class, args);
    }
}
spring boot 程式就行這裡開始,如果還不清楚,可以參考「第一個 spring boot web 程式 (使用 embedded tomcat)」。
  • Person
@Entity
@Table(name="PERSON")
@Data
public class Person {
    @Id
    @GeneratedValue(strategy=GenerationType.SEQUENCE, generator="SEQ_PERSON")
    @SequenceGenerator(name="SEQ_PERSON", sequenceName="SEQ_PERSON", allocationSize=1)
    @Column(name="ID", nullable=false)
    private Long id;
 
    private String name;
 
    private Long age;
 
    private String nation;
 
    private String address;
}
JPA 的 entity 定義如上,對應前面的 table PERSON。
  • PersonDao
public interface PersonDao extends CrudRepository<Person, Long> {

}
有 Entity 之後,自然要寫個 DAO 來存取,如上,這是 spring data 提供的簡便方法,如果不了解,可以參考「spring data: CrudRepository 自動生成」。
  • BatchConfig
@Configuration
@EnableBatchProcessing
public class BatchConfig {

}
只要在 @Configuration 所在類別加入 @EnableBatchProcessing,spring batch 會自動設定好基本的設定,並建立起以下幾個 bean: (括號中為 bean name)
  1. JobRepository (jobRepository)
  2. JobLauncher  (jobLauncher)
  3. JobRegistry (jobRegistry)
  4. PlatformTransactionManager  (transactionManager)
  5. JobBuilderFactory (jobBuilders)
  6. StepBuilderFactory (stepBuilders)
並且會將執行過程及結果寫入底下六個 tables:
  1. BATCH_JOB_EXECUTION
  2. BATCH_JOB_EXECUTION_CONTEXT
  3. BATCH_JOB_EXECUTION_PARAMS
  4. BATCH_JOB_INSTANCE
  5. BATCH_STEP_EXECUTION
  6. BATCH_STEP_EXECUTION_CONTEXT
如果因為測試時不想寫出那麼多 log,或是基於各種考量,不希望 spring batch 記下這些 log,可以改成如下,使用預設的 spring batch 設定 (繼承 DefaultBatchConfigurer),但是在注入 DataSource 時刻意將它忽略,這樣 spring batch 就不會寫 log 到資料庫了。
@Configuration
@EnableBatchProcessing
public class BatchConfig extends DefaultBatchConfigurer {
    @Override
    public void setDataSource(DataSource dataSource) {
        // override to do not set datasource even if a datasource exist.
        // initialize will use a Map based JobRepository (instead of database)
    }
}
  • PersonJob
@Configuration
public class PersonJob {
    @Autowired
    private CsvJobListener listener;
 
    @Bean
    public Job importJob(JobBuilderFactory jobs, Step s1) {
        return jobs.get("importJob")
          .incrementer(new RunIdIncrementer())
          .flow(s1)
          .end()
          .listener(listener)
          .build();
    }
    
    @Bean
    public Step step1(
        StepBuilderFactory stepBuilderFactory, 
        ItemReader<Person> reader, 
        ItemWriter<Person> writer,
        ItemProcessor<Person,Person> processor) {

        return stepBuilderFactory
          .get("step1")
          .<Person, Person>chunk(500)
          .reader(reader)
          .processor(processor)
          .writer(writer)
          .build();
    }
    
    @Bean
    public FlatFileItemReader<Person> reader() throws Exception {
        FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();
        reader.setResource(new ClassPathResource("people.csv"));
        reader.setLineMapper(
            new DefaultLineMapper<Person>() {{
                setLineTokenizer(new DelimitedLineTokenizer() {{
                    setNames(new String[] { "name", "age", "nation", "address" });
                }});
        
                setFieldSetMapper(new PersonFieldSetMapper());
            }});

        return reader;
    }
}

這裡定義了三個 bean,第一個是定義了一個 Job; 第二個 bean 是定義 Job 中的步驟,因為這個 Job 只有一個步驟,就只有一個 Step bean,其中 500 表示每 500 筆存一次,spring batch 不會每讀一筆就存一次,這樣太沒效率了; 第三個 bean 用來讀取 CSV 檔,可以看到欄位與的 Person 類別一致,當 spring batch 讀取一行 CSV 的資料,就會產生一個 Person object 來儲存。
  • PersonItemWriter
@Component
public class PersonItemWriter implements ItemWriter {
    @Autowired
    private PersonDao daoPerson;

    @Override
    public void write(List items) throws Exception {
        items.forEach(i -> {
            daoPerson.save(i);
        }); 
    }
}
寫入資料庫,我簡單的用 spring data 提供的 DAO 寫入。
  • PersonFieldSetMapper
@Component
public class PersonFieldSetMapper implements FieldSetMapper {

    @Override
    public Person mapFieldSet(FieldSet fieldSet) throws BindException {
        Person person = new Person();
        person.setName(fieldSet.readString("name"));
        person.setAge(fieldSet.readLong("age"));
        person.setNation(fieldSet.readString("nation"));
        person.setAddress(fieldSet.readString("address"));

        return person;
    }
}
由 reader 讀入的資料在這裡轉換成 Person object。
  • PersonItemProcessor
@Component
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
    @Override
    public Person process(Person item) throws ValidationException {
        if (item.getNation().equals("大和民族")) {
            item.setNation("01");
        } else {
            item.setNation("02");
        }
        return item;
    }
}
由 csv 讀入資料後,會傳到這個類別,如果要做一些處理,可以在這裡進行,要特別注意傳回值如果是  null,則 spring batch 不會呼叫 writer,這表示在這裡我們可以篩選進來的資料,不符合需求的就不寫入。要特別注意的是,spring batch 在 ItemProcessor 也會計算次數,看看 ItemReader 送來的資料筆數是否已經等於前面設定的 chunk 筆數,等於了才會送給 ItemWriter,流程會像這樣 …
  • CsvJobListener
@Component
@Slf4j
public class CsvJobListener implements JobExecutionListener {
    private long startTime;
    private long endTime;

    @Override
    public void beforeJob(JobExecution jobExecution) {
        startTime = System.currentTimeMillis();
        log.info("任務處理開始");
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        endTime = System.currentTimeMillis();
        log.info("任務處理結束");
        log.info("耗時:" + (endTime - startTime) + "ms");
    }
}
Listener 並非必要的,這裡用它來寫一些 log,好方便我們觀察。
  • 執行結果


執行後查詢一下資料庫,應該可以看到已經寫入了。