中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

springbatch的封裝與使用

springbatch

主要實現批量數據的(de)(de)(de)處理(li)(li),我對(dui)batch進行的(de)(de)(de)封裝,提出了jobBase類型,具體job需要實現它即可(ke)。Spring Batch 不僅提供(gong)了統(tong)一的(de)(de)(de)讀寫(xie)接口(kou)、豐富(fu)的(de)(de)(de)任務(wu)處理(li)(li)方式、靈活的(de)(de)(de)事務(wu)管理(li)(li)及(ji)并發(fa)處理(li)(li),同時還支(zhi)持日志、監(jian)控、任務(wu)重啟與跳過等特(te)性,大大簡化了批處理(li)(li)應用(yong)開(kai)發(fa),將開(kai)發(fa)人員從復雜的(de)(de)(de)任務(wu)配置管理(li)(li)過程中解(jie)放出來,使他們(men)可(ke)以(yi)更多地去關注核(he)心(xin)的(de)(de)(de)業(ye)務(wu)處理(li)(li)過程。

幾個組件

  • job
  • step
  • read
  • write
  • listener
  • process
  • validator

JobBase定義了幾個公用的方法

 /**
  * springBatch的job基礎類.
  */
 public abstract class JobBase<T> {
 
   /**
    * 批次.
    */
   protected int chunkCount = 5000;
   /**
    * 監聽器.
    */
   private JobExecutionListener jobExecutionListener;
   /**
    * 處理器.
    */
   private ValidatingItemProcessor<T> validatingItemProcessor;
   /**
    * job名稱.
    */
   private String jobName;
   /**
    * 檢驗器.
    */
   private Validator<T> validator;
   @Autowired
   private JobBuilderFactory job;
   @Autowired
   private StepBuilderFactory step;
 
 
   /**
    * 初始化.
    *
    * @param jobName                 job名稱
    * @param jobExecutionListener    監聽器
    * @param validatingItemProcessor 處理器
    * @param validator               檢驗
    */
   public JobBase(String jobName,
                  JobExecutionListener jobExecutionListener,
                  ValidatingItemProcessor<T> validatingItemProcessor,
                  Validator<T> validator) {
     this.jobName = jobName;
     this.jobExecutionListener = jobExecutionListener;
     this.validatingItemProcessor = validatingItemProcessor;
     this.validator = validator;
   }
 
   /**
    * job初始化與啟動.
    */
   public Job getJob() throws Exception {
     return job.get(jobName).incrementer(new RunIdIncrementer())
         .start(syncStep())
         .listener(jobExecutionListener)
         .build();
   }
 
   /**
    * 執行步驟.
    *
    * @return
    */
   public Step syncStep() throws Exception {
     return step.get("step1")
         .<T, T>chunk(chunkCount)
         .reader(reader())
         .processor(processor())
         .writer(writer())
         .build();
   }
 
   /**
    * 單條處理數據.
    *
    * @return
    */
   public ItemProcessor<T, T> processor() {
     validatingItemProcessor.setValidator(processorValidator());
     return validatingItemProcessor;
   }
 
   /**
    * 校驗數據.
    *
    * @return
    */
   @Bean
   public Validator<T> processorValidator() {
     return validator;
   }
 
   /**
    * 批量讀數據.
    *
    * @return
    * @throws Exception
    */
   public abstract ItemReader<T> reader() throws Exception;
 
   /**
    * 批量寫數據.
    *
    * @return
    */
   @Bean
   public abstract ItemWriter<T> writer();
 
 }

主要規定了公用方(fang)法的執行策略,而具體的job名稱,讀,寫還是需(xu)要具體JOB去實現(xian)的。

具體Job實現

 @Configuration
 @EnableBatchProcessing
 public class SyncPersonJob extends JobBase<Person> {
   @Autowired
   private DataSource dataSource;
   @Autowired
   @Qualifier("primaryJdbcTemplate")
   private JdbcTemplate jdbcTemplate;
 
   /**
    * 初始化,規則了job名稱和監視器.
    */
   public SyncPersonJob() {
     super("personJob", new PersonJobListener(), new PersonItemProcessor(), new BeanValidator<>());
   }
 
   @Override
   public ItemReader<Person> reader() throws Exception {
     StringBuffer sb = new StringBuffer();
     sb.append("select * from person");
     String sql = sb.toString();
     JdbcCursorItemReader<Person> jdbcCursorItemReader =
         new JdbcCursorItemReader<>();
     jdbcCursorItemReader.setSql(sql);
     jdbcCursorItemReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
     jdbcCursorItemReader.setDataSource(dataSource);
 
     return jdbcCursorItemReader;
   }
 
 
   @Override
   @Bean("personJobWriter")
   public ItemWriter<Person> writer() {
     JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();
     writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
     String sql = "insert into person_export " + "(id,name,age,nation,address) "
         + "values(:id, :name, :age, :nation,:address)";
     writer.setSql(sql);
     writer.setDataSource(dataSource);
     return writer;
   }
 
 }

寫操作需要定義自己的bean的聲明

注意,需(xu)要為(wei)每(mei)個(ge)job的write啟個(ge)名稱,否則在多job時,write將會(hui)被打(da)亂(luan)

  /**
   * 批量寫數據.
   *
   * @return
   */
  @Override
  @Bean("personVerson2JobWriter")
  public ItemWriter<Person> writer() {
   
  }

添加一個api,手動觸發

 @Autowired
  SyncPersonJob syncPersonJob;

  @Autowired
  JobLauncher jobLauncher;

  void exec(Job job) throws Exception {
    JobParameters jobParameters = new JobParametersBuilder()
        .addLong("time", System.currentTimeMillis())
        .toJobParameters();
    jobLauncher.run(job, jobParameters);
  }

  @RequestMapping("/run1")
  public String run1() throws Exception {
    exec(syncPersonJob.getJob());
    return "personJob success";
  }
posted @ 2019-04-17 13:38  張占嶺  閱讀(2396)  評論(0)    收藏  舉報