在Spring batch中,Partitioning意味着对数据进行分片,然后每片实现专门处理,假设单线程处理100个数据需要10分钟,但是 我们将100个数据分片成十块,每块单独处理:
Thread 1 - Process from 1 to 10 Thread 2 - Process from 11 to 20 Thread 3 - Process from 21 to 30 ...... Thread 9 - Process from 81 to 90 Thread 10 - Process from 91 to 100
这样整个过程可能只需要1 分钟。
分区原理是一个master主处理器对应多个从slave处理器:
从处理器的节点可以是远程服务器的服务,也可以是本地执行的线程。主处理器发送给从处理器的消息是不需要持久或实现JMS那种严格的保证消息传递的,Spring Batch元数据JobRepository会确保每个slave执行一次,每次Job执行只执行一次。
我们的案例是从数据库表user导出数据库到User对象,然后到保存10个csv文件中。
看看csv文件:
从数据表写入数据后大概为:
1 | dd | password | 30 | |
2 | dd | password | 24 | |
3 | dd | password | 22 | |
4 | dd | password | 28 | |
5 | dd | password | 34 | |
6 | dd | password | 20 |
姓名:密码:年龄
用户User模型是:
@Data @Entity public class User { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) int id; String username; String password; int age; }
这里使用了lomok,节省setter/getter代码编写,使用了JPA注释主要是为了能自动生成数据表user:
CREATE TABLE `user` ( `id` int(11) NOT NULL , `username` varchar(45) NOT NULL default '', `password` varchar(45) NOT NULL default '', `age` int(11) , PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; insert into `user` (`id`, `age`, `password`, `username`) values('1','30','password','1dd'); insert into `user` (`id`, `age`, `password`, `username`) values('2','24','password','2dd'); insert into `user` (`id`, `age`, `password`, `username`) values('3','22','password','3dd'); insert into `user` (`id`, `age`, `password`, `username`) values('4','28','password','4dd'); insert into `user` (`id`, `age`, `password`, `username`) values('5','34','password','5dd'); insert into `user` (`id`, `age`, `password`, `username`) values('6','20','password','6dd');
将这个sql存入schema.sql和application.properties放一起,这样启动时再配置一下,就可以自动生成数据表,application.properties配置如下:
spring.batch.initialize-schema=always spring.datasource.url=jdbc:mysql://localhost:3306/mytest spring.datasource.username=root spring.datasource.password=root spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.jpa.generate-ddl=true
最后一行是自动生成User表,第一行是生成Spring batch 自身的表。
基础条件准备好了,看看分区核心类:
@Configuration @EnableBatchProcessing public class PartitionerJob { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private DataSource dataSource; @Bean public Job PartitionJob() { return jobBuilderFactory.get("partitionJob").incrementer(new RunIdIncrementer()) .start(masterStep()).next(step2()).build(); } @Bean public Step step2() { return stepBuilderFactory.get("step2").tasklet(dummyTask()).build(); } @Bean public DummyTasklet dummyTask() { return new DummyTasklet(); } @Bean public Step masterStep() { return stepBuilderFactory.get("masterStep").partitioner(slave().getName(), rangePartitioner()) .partitionHandler(masterSlaveHandler()).build(); } @Bean public PartitionHandler masterSlaveHandler() { TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler(); handler.setGridSize(10); handler.setTaskExecutor(taskExecutor()); handler.setStep(slave()); try { handler.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return handler; } @Bean(name = "slave") public Step slave() { log.info("...........called slave ........."); return stepBuilderFactory.get("slave").<User, User>chunk(100) .reader(slaveReader(null, null, null)) .processor(slaveProcessor(null)).writer(slaveWriter(null, null)).build(); } @Bean public RangePartitioner rangePartitioner() { return new RangePartitioner(); } @Bean public SimpleAsyncTaskExecutor taskExecutor() { return new SimpleAsyncTaskExecutor(); } @Bean @StepScope public UserProcessor slaveProcessor(@Value("#{stepExecutionContext[name]}") String name) { log.info("********called slave processor **********"); UserProcessor userProcessor = new UserProcessor(); userProcessor.setThreadName(name); return userProcessor; } @Bean @StepScope public JdbcPagingItemReader<User> slaveReader( @Value("#{stepExecutionContext[fromId]}") final String fromId, @Value("#{stepExecutionContext[toId]}") final String toId, @Value("#{stepExecutionContext[name]}") final String name) { log.info("slaveReader start " + fromId + " " + toId); JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<>(); reader.setDataSource(dataSource); reader.setQueryProvider(queryProvider()); Map<String, Object> parameterValues = new HashMap<>(); parameterValues.put("fromId", fromId); parameterValues.put("toId", toId); log.info("Parameter Value " + name + " " + parameterValues); reader.setParameterValues(parameterValues); reader.setPageSize(1000); reader.setRowMapper(new BeanPropertyRowMapper<User>() {{ setMappedClass(User.class); }}); log.info("slaveReader end " + fromId + " " + toId); return reader; } @Bean public PagingQueryProvider queryProvider() { log.info("queryProvider start "); SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean(); provider.setDataSource(dataSource); provider.setSelectClause("select id, username, password, age"); provider.setFromClause("from user"); provider.setWhereClause("where id >= :fromId and id <= :toId"); provider.setSortKey("id"); log.info("queryProvider end "); try { return provider.getObject(); } catch (Exception e) { log.info("queryProvider exception "); e.printStackTrace(); } return null; } @Bean @StepScope public FlatFileItemWriter<User> slaveWriter( @Value("#{stepExecutionContext[fromId]}") final String fromId, @Value("#{stepExecutionContext[toId]}") final String toId) { FlatFileItemWriter<User> reader = new FlatFileItemWriter<>(); reader.setResource(new FileSystemResource( "csv/outputs/users.processed" + fromId + "-" + toId + ".csv")); //reader.setAppendAllowed(false); reader.setLineAggregator(new DelimitedLineAggregator<User>() {{ setDelimiter(","); setFieldExtractor(new BeanWrapperFieldExtractor<User>() {{ setNames(new String[]{"id", "username", "password", "age"}); }}); }}); return reader; } }
在PartitionerJob:
主节点步骤如下:
public Step masterStep() { return stepBuilderFactory.get("masterStep").partitioner(slave().getName(), rangePartitioner()) .partitionHandler(masterSlaveHandler()).build(); } @Bean public RangePartitioner rangePartitioner() { return new RangePartitioner(); }
这里有一个rangePartitioner对象,是我们的核心分区类,对每个分区后的从节点处理器具体业务:
@Slf4j public class RangePartitioner implements Partitioner { @Override public Map<String, ExecutionContext> partition(int gridSize) { log.info("partition called gridsize= " + gridSize); Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>(); int range = 10; int fromId = 1; int toId = range; for (int i = 1; i <= gridSize; i++) { ExecutionContext value = new ExecutionContext(); System.out.println("/nStarting : Thread" + i); System.out.println("fromId : " + fromId); System.out.println("toId : " + toId); value.putInt("fromId", fromId); value.putInt("toId", toId); // give each thread a name, thread 1,2,3 value.putString("name", "Thread" + i); result.put("partition" + i, value); fromId = toId + 1; toId += range; } return result; } }
核心类是在每个线程输出:
Starting : Thread1
fromId : 1
toId : 10
Starting : Thread2
fromId : 11
toId : 20
Starting : Thread3
fromId : 21
toId : 30
Starting : Thread4
fromId : 31
toId : 40
Starting : Thread5
fromId : 41
toId : 50
Starting : Thread6
fromId : 51
toId : 60
Starting : Thread7
fromId : 61
toId : 70
Starting : Thread8
fromId : 71
toId : 80
Starting : Thread9
fromId : 81
toId : 90
Starting : Thread10
fromId : 91
toId : 100
现在再回到看看主步骤后面一个动作,设置了masterSlaveHandler类:
@Bean public PartitionHandler masterSlaveHandler() { TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler(); handler.setGridSize(10); handler.setTaskExecutor(taskExecutor()); handler.setStep(slave()); try { handler.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return handler; }
这里设置了分片大小为10个,同时设置了异步任务执行器,也将从节点的步骤设置进去。
以上源码可见: github
远程分区=本地分区+远程分块,远程分块可见之前教程,实际是远程分块加上JMS消息系统,实现分布式系统协调计算。具体代码可见 spring batch remote partition
我们看一下主要步骤代码:
@Bean public Job remotePartitioningJob() { return this.jobBuilderFactory.get("remotePartitioningJob") .start(masterStep()) .build(); } /* * Configure the master step */ @Bean public Step masterStep() { return this.masterStepBuilderFactory.get("masterStep") .partitioner("workerStep", new BasicPartitioner()) .gridSize(GRID_SIZE) .outputChannel(requests()) .build(); }
这里的BasicPartitioner类似rangePartitioner,后面就与本地分区不一样了,设置了grid大小以后,就输出到JMS outputChanner,与从节点进行通信,看看从节点代码:
/* * Configure the worker step */ @Bean public Step workerStep() { return this.workerStepBuilderFactory.get("workerStep") .inputChannel(requests()) .tasklet(tasklet(null)) .build(); } @Bean @StepScope public Tasklet tasklet(@Value("#{stepExecutionContext['partition']}") String partition) { return (contribution, chunkContext) -> { System.out.println("processing " + partition); return RepeatStatus.FINISHED; }; }
从节点使用inputChannel从JMS监听到消息以后,运行tasklet。
这里主节点发生消息到从节点以后,从节点并没有返回响应,如果希望获得从节点结果汇聚到主节点,类似Map/reduce的reduce作用,那么主节点代码如下:
@Bean public Step masterStep() { return this.masterStepBuilderFactory.get("masterStep") .partitioner("workerStep", new BasicPartitioner()) .gridSize(GRID_SIZE) .outputChannel(requests()) .inputChannel(replies()) .build(); }
我们发现outputChannel以后有从replies队列用inputChannel获得消息。看看从节点是怎么回复结果的:
@Bean public Step workerStep() { return this.workerStepBuilderFactory.get("workerStep") .inputChannel(requests()) .outputChannel(replies()) .tasklet(tasklet(null)) .build(); }
这里是inputChannel以后再输出outputChannel到replies队列。
Spring batch专题