Spring Batch提供了可处理大量记录所必需的可重用功能,包括日志记录/跟踪,事务管理,作业处理统计,作业重启,跳过和资源管理等交叉问题。这里展示一个并行运行多个作业的示例,作业彼此独立并以并行方式完成执行。SpringBatch入口概念是Job,一个Job由多个step步骤组成,通过步骤的不同并行方式实现并行批处理,步骤并行模式有以下几个方式:
步骤step是多线程(单个进程)
步骤step是并行的(单个过程)
步骤是远程分块(多进程)
步骤分区数据分片(单个或多个过程)
启动并行处理的最简单方法是在Step配置中添加一个TaskExecutor。
@Bean public TaskExecutor taskExecutor(){ return new SimpleAsyncTaskExecutor("spring_batch"); } @Bean public Step sampleStep(TaskExecutor taskExecutor) { return this.stepBuilderFactory.get("sampleStep") .<String, String>chunk(10) .reader(itemReader()) .writer(itemWriter()) .taskExecutor(taskExecutor) .build(); }
TaskExecutor 是一个标准的Spring接口,最简单的多线程TaskExecutor是 SimpleAsyncTaskExecutor,上述配置的结果是Step通过在单独的执行线程中进行读取,处理和输出每个块。请注意,这意味着处理条目是没有固定的顺序。线程池默认为4个线程.你增加此限制以确保线程池是充分利用
@Bean public Step sampleStep(TaskExecutor taskExecutor) { return this.stepBuilderFactory.get("sampleStep") .<String, String>chunk(10) .reader(itemReader()) .writer(itemWriter()) .taskExecutor(taskExecutor) .throttleLimit(20) .build(); }
如果在步骤中使用了数据库连接池,这些连接池的最大连接数也可能会限制批处理的并发性,确保这些资源池中设置至少与步骤中所需的并发线程数一样大。
步骤step对于多线程使用还是有一定限制,其条目的读入、处理和输出处理器都是有状态的。如果状态没有被线程隔离,那么这些组件在多线程中不可用Step。你可以使用SynchronizedItemStreamReader确保线程安全。
如果你的业务逻辑可以分成不同的职责并分配给各个步骤,那么它就可以在一个进程中并行化。并行步执行易于配置和使用。
首先使用FlowBuilder构建一个个小的flow流程,在这个流程里面指定步骤,两个流程flow是并行执行的,下面有两个并行流flow1和flow2,flow1里面有step1 step2先后顺序,flow2有step3,也就是说{step1,step2}一起和step3是并行的:
@Bean public Job job() { return jobBuilderFactory.get("job") .start(splitFlow()) .next(step4()) .build() //builds FlowJobBuilder instance .build(); //builds Job instance } @Bean public Flow splitFlow() { return new FlowBuilder<SimpleFlow>("splitFlow") .split(taskExecutor()) .add(flow1(), flow2()) .build(); } @Bean public Flow flow1() { return new FlowBuilder<SimpleFlow>("flow1") .start(step1()) .next(step2()) .build(); } @Bean public Flow flow2() { return new FlowBuilder<SimpleFlow>("flow2") .start(step3()) .build(); } @Bean public TaskExecutor taskExecutor(){ return new SimpleAsyncTaskExecutor("spring_batch"); }
需要指定TaskExecutor 应该使用哪个实现来执行各个流。默认值为 SyncTaskExecutor没有用,需要异步TaskExecutor才能并行运行这些步骤。
再看一个并行案例:
@Bean public Job parallelStepsJob() { Flow masterFlow = new FlowBuilder<Flow>("masterFlow").start(taskletStep("step1")).build(); Flow flowJob1 = new FlowBuilder<Flow>("flow1").start(taskletStep("step2")).build(); Flow flowJob2 = new FlowBuilder<Flow>("flow2").start(taskletStep("step3")).build(); Flow flowJob3 = new FlowBuilder<Flow>("flow3").start(taskletStep("step4")).build(); Flow slaveFlow = new FlowBuilder<Flow>("splitflow") .split(new SimpleAsyncTaskExecutor()).add(flowJob1, flowJob2, flowJob3).build(); return (jobBuilderFactory.get("parallelFlowJob") .incrementer(new RunIdIncrementer()) .start(masterFlow) .next(slaveFlow) .build()).build(); } private TaskletStep taskletStep(String step) { return stepBuilderFactory.get(step).tasklet((contribution, chunkContext) -> { IntStream.range(1, 100).forEach(token -> logger.info("Step:" + step + " token:" + token)); return RepeatStatus.FINISHED; }).build(); }
这里有四个流程,主流程masterFlow是最先开始,然后并行的是flowJob1, flowJob2, flowJob3
下面我们将分析分布式的处理方式。见spring batch批处理