springbatch的封裝與使用
springbatch
主要實現批量數據的(de)(de)(de)處理(li)(li),我對(dui)batch進行的(de)(de)(de)封裝,提出了jobBase類型,具體job需要實現它即可(ke)。Spring Batch 不僅提供(gong)了統(tong)一的(de)(de)(de)讀寫(xie)接口(kou)、豐富(fu)的(de)(de)(de)任務(wu)處理(li)(li)方式、靈活的(de)(de)(de)事務(wu)管理(li)(li)及(ji)并發(fa)處理(li)(li),同時還支(zhi)持日志、監(jian)控、任務(wu)重啟與跳過等特(te)性,大大簡化了批處理(li)(li)應用(yong)開(kai)發(fa),將開(kai)發(fa)人員從復雜的(de)(de)(de)任務(wu)配置管理(li)(li)過程中解(jie)放出來,使他們(men)可(ke)以(yi)更多地去關注核(he)心(xin)的(de)(de)(de)業(ye)務(wu)處理(li)(li)過程。
幾個組件
- job
- step
- read
- write
- listener
- process
- validator
JobBase定義了幾個公用的方法
/**
* springBatch的job基礎類.
*/
public abstract class JobBase<T> {
/**
* 批次.
*/
protected int chunkCount = 5000;
/**
* 監聽器.
*/
private JobExecutionListener jobExecutionListener;
/**
* 處理器.
*/
private ValidatingItemProcessor<T> validatingItemProcessor;
/**
* job名稱.
*/
private String jobName;
/**
* 檢驗器.
*/
private Validator<T> validator;
@Autowired
private JobBuilderFactory job;
@Autowired
private StepBuilderFactory step;
/**
* 初始化.
*
* @param jobName job名稱
* @param jobExecutionListener 監聽器
* @param validatingItemProcessor 處理器
* @param validator 檢驗
*/
public JobBase(String jobName,
JobExecutionListener jobExecutionListener,
ValidatingItemProcessor<T> validatingItemProcessor,
Validator<T> validator) {
this.jobName = jobName;
this.jobExecutionListener = jobExecutionListener;
this.validatingItemProcessor = validatingItemProcessor;
this.validator = validator;
}
/**
* job初始化與啟動.
*/
public Job getJob() throws Exception {
return job.get(jobName).incrementer(new RunIdIncrementer())
.start(syncStep())
.listener(jobExecutionListener)
.build();
}
/**
* 執行步驟.
*
* @return
*/
public Step syncStep() throws Exception {
return step.get("step1")
.<T, T>chunk(chunkCount)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
/**
* 單條處理數據.
*
* @return
*/
public ItemProcessor<T, T> processor() {
validatingItemProcessor.setValidator(processorValidator());
return validatingItemProcessor;
}
/**
* 校驗數據.
*
* @return
*/
@Bean
public Validator<T> processorValidator() {
return validator;
}
/**
* 批量讀數據.
*
* @return
* @throws Exception
*/
public abstract ItemReader<T> reader() throws Exception;
/**
* 批量寫數據.
*
* @return
*/
@Bean
public abstract ItemWriter<T> writer();
}
主要規定了公用方(fang)法的執行策略,而具體的job名稱,讀,寫還是需(xu)要具體JOB去實現(xian)的。
具體Job實現
@Configuration
@EnableBatchProcessing
public class SyncPersonJob extends JobBase<Person> {
@Autowired
private DataSource dataSource;
@Autowired
@Qualifier("primaryJdbcTemplate")
private JdbcTemplate jdbcTemplate;
/**
* 初始化,規則了job名稱和監視器.
*/
public SyncPersonJob() {
super("personJob", new PersonJobListener(), new PersonItemProcessor(), new BeanValidator<>());
}
@Override
public ItemReader<Person> reader() throws Exception {
StringBuffer sb = new StringBuffer();
sb.append("select * from person");
String sql = sb.toString();
JdbcCursorItemReader<Person> jdbcCursorItemReader =
new JdbcCursorItemReader<>();
jdbcCursorItemReader.setSql(sql);
jdbcCursorItemReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
jdbcCursorItemReader.setDataSource(dataSource);
return jdbcCursorItemReader;
}
@Override
@Bean("personJobWriter")
public ItemWriter<Person> writer() {
JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
String sql = "insert into person_export " + "(id,name,age,nation,address) "
+ "values(:id, :name, :age, :nation,:address)";
writer.setSql(sql);
writer.setDataSource(dataSource);
return writer;
}
}
寫操作需要定義自己的bean的聲明
注意,需(xu)要為(wei)每(mei)個(ge)job的write啟個(ge)名稱,否則在多job時,write將會(hui)被打(da)亂(luan)
/**
* 批量寫數據.
*
* @return
*/
@Override
@Bean("personVerson2JobWriter")
public ItemWriter<Person> writer() {
}
添加一個api,手動觸發
@Autowired
SyncPersonJob syncPersonJob;
@Autowired
JobLauncher jobLauncher;
void exec(Job job) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
@RequestMapping("/run1")
public String run1() throws Exception {
exec(syncPersonJob.getJob());
return "personJob success";
}