Spring Batch的远程分块其实是一种主从分布式处理模式,一个主处理机和一个或多个从处理机,这样提高了批处理的计算能力,主从之间是通过消息中间件JMS进行通信。
在远程分块中,Step处理分为主从多个处理过程,主从之间通过一些中间件相互通信。下图显示了该模式:
master主组件是一个单个处理,slave从属组件则是多个远程处理过程。如果主处理不会造成瓶颈,这种模式效果最好,因为一般情况下输出处理必须比读取处理更昂贵(在实践中通常是这种情况)。
主master组件知道如何使用消息的通用版本发送条目item块到消息中间件,slave从属服务器是消息中间件的标准监听器(例如如果使用JMS,则是MesssageListener实现),然后调用ItemWriter或ItemProcessor 来处理条目块。
使用此模式的一个优点是读取器、处理器和输出器等组件是现成的(与用于本地执行步骤的组件相同)。这些项目是动态划分的,工作通过中间件共享,因此,如果监听器都是繁忙消费者,那么将自动加入负载平衡。
Spring批处理通常是和消息系统等中间件集成的,向批处理流程添加消息传递可实现操作的自动化,也可实现方式和策略的分离,比如消息触发要执行的作业,然后以各种方式发送消息。或者,当作业完成或失败时,会触发发送的相应成功或失败的消息等等,消息的使用者可能与批处理应用本身无关。
消息传递也可以嵌入到批处理的作业执行中,远程分区和远程分块提供了将工作负载分配给多个处理器工作。
本项目我们使用spring-batch-integration实现远程分块。
这里我们使用ActiveMQ作为JMS的实现,主处理和从处理之间通过两个队列通讯,requests队列时主节点通过chunkMessageChannelItemWriter发往从节点的,而从节点通过ChunkProcessorChunkHandler接受到消息后,再将结果发往主节点的消息通过replies。
主节点的Job代码如下:
@Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public TaskletStep masterStep() { return this.stepBuilderFactory.get("masterStep") .<Integer, Integer>chunk(3) .reader(itemReader()) .writer(itemWriter()) .build(); } @Bean public Job remoteChunkingJob() { return this.jobBuilderFactory.get("remoteChunkingJob") .start(masterStep()) .build(); } @Bean public ItemReader<Integer> itemReader() { return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6)); } @Bean public ItemWriter<Integer> itemWriter() { MessagingTemplate messagingTemplate = new MessagingTemplate(); messagingTemplate.setDefaultChannel(requests()); ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>(); chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate); chunkMessageChannelItemWriter.setReplyChannel(replies()); return chunkMessageChannelItemWriter; }
与JMS连接的代码如下:
@Value("${broker.url}") private String brokerUrl; @Bean public ActiveMQConnectionFactory jmsConnectionFactory() { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); connectionFactory.setBrokerURL(this.brokerUrl); connectionFactory.setTrustAllPackages(true); return connectionFactory; } /* * Configure outbound flow (requests going to workers) */ @Bean public DirectChannel requests() { return new DirectChannel(); } @Bean public IntegrationFlow outboundFlow(ActiveMQConnectionFactory jmsConnectionFactory) { return IntegrationFlows .from(requests()) .handle(Jms.outboundAdapter(jmsConnectionFactory).destination("requests")) .get(); } /* * Configure inbound flow (replies coming from workers) */ @Bean public QueueChannel replies() { return new QueueChannel(); } @Bean public IntegrationFlow inboundFlow(ActiveMQConnectionFactory jmsConnectionFactory) { return IntegrationFlows .from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory).destination("replies")) .channel(replies()) .get(); }
Application.properties:
broker.url=tcp://localhost:61616 spring.batch.initialize-schema=always spring.datasource.url=jdbc:mysql://localhost:3306/mytest spring.datasource.username=root spring.datasource.password=root spring.datasource.driver-class-name=com.mysql.jdbc.Driver
从节点的Job如下:
/* * Configure worker components */ @Bean public ItemProcessor<Integer, Integer> itemProcessor() { return item -> { System.out.println("processing item " + item); return item; }; } @Bean public ItemWriter<Integer> itemWriter() { return items -> { for (Integer item : items) { System.out.println("writing item " + item + System.currentTimeMillis()); } }; } @Bean @ServiceActivator(inputChannel = "requests", outputChannel = "replies") public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() { ChunkProcessor<Integer> chunkProcessor = new SimpleChunkProcessor<>(itemProcessor(), itemWriter()); ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler = new ChunkProcessorChunkHandler<>(); chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor); return chunkProcessorChunkHandler; }
从节点连接JMS代码类似于主节点,自动实现了JMS的侦听器作用。从节点主要是在chunkProcessorChunkHandler这里做了处理和输出回到主节点。
本节源码: Github
Spring batch