转载

Spring并行批处理

Spring Batch提供了可处理大量记录所必需的可重用功能,包括日志记录/跟踪,事务管理,作业处理统计,作业重启,跳过和资源管理等交叉问题。这里展示一个并行运行多个作业的示例,作业彼此独立并以并行方式完成执行。SpringBatch入口概念是Job,一个Job由多个step步骤组成,通过步骤的不同并行方式实现并行批处理,步骤并行模式有以下几个方式:

  • 步骤step是多线程(单个进程)

  • 步骤step是并行的(单个过程)

  • 步骤是远程分块(多进程)

  • 步骤分区数据分片(单个或多个过程)

多线程步骤

启动并行处理的最简单方法是在Step配置中添加一个TaskExecutor。

@Bean
public TaskExecutor taskExecutor(){
   return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
   return this.stepBuilderFactory.get("sampleStep")
         .<String, String>chunk(10)
         .reader(itemReader())
         .writer(itemWriter())
         .taskExecutor(taskExecutor)
         .build();
}

TaskExecutor 是一个标准的Spring接口,最简单的多线程TaskExecutor是 SimpleAsyncTaskExecutor,上述配置的结果是Step通过在单独的执行线程中进行读取,处理和输出每个块。请注意,这意味着处理条目是没有固定的顺序。线程池默认为4个线程.你增加此限制以确保线程池是充分利用

@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
   return this.stepBuilderFactory.get("sampleStep")
         .<String, String>chunk(10)
         .reader(itemReader())
         .writer(itemWriter())
         .taskExecutor(taskExecutor)
         .throttleLimit(20)
         .build();
}

如果在步骤中使用了数据库连接池,这些连接池的最大连接数也可能会限制批处理的并发性,确保这些资源池中设置至少与步骤中所需的并发线程数一样大。

步骤step对于多线程使用还是有一定限制,其条目的读入、处理和输出处理器都是有状态的。如果状态没有被线程隔离,那么这些组件在多线程中不可用Step。你可以使用SynchronizedItemStreamReader确保线程安全。

并行步骤

如果你的业务逻辑可以分成不同的职责并分配给各个步骤,那么它就可以在一个进程中并行化。并行步执行易于配置和使用。

首先使用FlowBuilder构建一个个小的flow流程,在这个流程里面指定步骤,两个流程flow是并行执行的,下面有两个并行流flow1和flow2,flow1里面有step1 step2先后顺序,flow2有step3,也就是说{step1,step2}一起和step3是并行的:

@Bean
public Job job() {
   return jobBuilderFactory.get("job")
         .start(splitFlow())
         .next(step4())
         .build()        //builds FlowJobBuilder instance
         .build();       //builds Job instance
}

@Bean
public Flow splitFlow() {
   return new FlowBuilder<SimpleFlow>("splitFlow")
         .split(taskExecutor())
         .add(flow1(), flow2())
         .build();
}

@Bean
public Flow flow1() {
   return new FlowBuilder<SimpleFlow>("flow1")
         .start(step1())
         .next(step2())
         .build();
}

@Bean
public Flow flow2() {
   return new FlowBuilder<SimpleFlow>("flow2")
         .start(step3())
         .build();
}

@Bean
public TaskExecutor taskExecutor(){
   return new SimpleAsyncTaskExecutor("spring_batch");
}

需要指定TaskExecutor 应该使用哪个实现来执行各个流。默认值为 SyncTaskExecutor没有用,需要异步TaskExecutor才能并行运行这些步骤。

再看一个并行案例:

@Bean
public Job parallelStepsJob() {

   Flow masterFlow = new FlowBuilder<Flow>("masterFlow").start(taskletStep("step1")).build();


   Flow flowJob1 = new FlowBuilder<Flow>("flow1").start(taskletStep("step2")).build();
   Flow flowJob2 = new FlowBuilder<Flow>("flow2").start(taskletStep("step3")).build();
   Flow flowJob3 = new FlowBuilder<Flow>("flow3").start(taskletStep("step4")).build();

   Flow slaveFlow = new FlowBuilder<Flow>("splitflow")
         .split(new SimpleAsyncTaskExecutor()).add(flowJob1, flowJob2, flowJob3).build();

   return (jobBuilderFactory.get("parallelFlowJob")
         .incrementer(new RunIdIncrementer())
         .start(masterFlow)
         .next(slaveFlow)
         .build()).build();

}


private TaskletStep taskletStep(String step) {
   return stepBuilderFactory.get(step).tasklet((contribution, chunkContext) -> {
      IntStream.range(1, 100).forEach(token -> logger.info("Step:" + step + " token:" + token));
      return RepeatStatus.FINISHED;
   }).build();

}

这里有四个流程,主流程masterFlow是最先开始,然后并行的是flowJob1, flowJob2, flowJob3

下面我们将分析分布式的处理方式。见spring batch批处理

原文  https://www.jdon.com/springboot/spring-batch-parallel.html
正文到此结束
Loading...