转载

Spring批处理远程分块

Spring Batch的远程分块其实是一种主从分布式处理模式,一个主处理机和一个或多个从处理机,这样提高了批处理的计算能力,主从之间是通过消息中间件JMS进行通信。

在远程分块中,Step处理分为主从多个处理过程,主从之间通过一些中间件相互通信。下图显示了该模式:

master主组件是一个单个处理,slave从属组件则是多个远程处理过程。如果主处理不会造成瓶颈,这种模式效果最好,因为一般情况下输出处理必须比读取处理更昂贵(在实践中通常是这种情况)。

主master组件知道如何使用消息的通用版本发送条目item块到消息中间件,slave从属服务器是消息中间件的标准监听器(例如如果使用JMS,则是MesssageListener实现),然后调用ItemWriter或ItemProcessor 来处理条目块。

使用此模式的一个优点是读取器、处理器和输出器等组件是现成的(与用于本地执行步骤的组件相同)。这些项目是动态划分的,工作通过中间件共享,因此,如果监听器都是繁忙消费者,那么将自动加入负载平衡。

Spring Batch集成

Spring批处理通常是和消息系统等中间件集成的,向批处理流程添加消息传递可实现操作的自动化,也可实现方式和策略的分离,比如消息触发要执行的作业,然后以各种方式发送消息。或者,当作业完成或失败时,会触发发送的相应成功或失败的消息等等,消息的使用者可能与批处理应用本身无关。

消息传递也可以嵌入到批处理的作业执行中,远程分区和远程分块提供了将工作负载分配给多个处理器工作。

本项目我们使用spring-batch-integration实现远程分块。

这里我们使用ActiveMQ作为JMS的实现,主处理和从处理之间通过两个队列通讯,requests队列时主节点通过chunkMessageChannelItemWriter发往从节点的,而从节点通过ChunkProcessorChunkHandler接受到消息后,再将结果发往主节点的消息通过replies。

主节点的Job代码如下:

@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
   
@Bean
public TaskletStep masterStep() {
   return this.stepBuilderFactory.get("masterStep")
         .<Integer, Integer>chunk(3)
         .reader(itemReader())
         .writer(itemWriter())
         .build();
}

@Bean
public Job remoteChunkingJob() {
   return this.jobBuilderFactory.get("remoteChunkingJob")
         .start(masterStep())
         .build();
}

@Bean
public ItemReader<Integer> itemReader() {
   return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6));
}

@Bean
public ItemWriter<Integer> itemWriter() {
   MessagingTemplate messagingTemplate = new MessagingTemplate();
   messagingTemplate.setDefaultChannel(requests());
   ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>();
   chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
   chunkMessageChannelItemWriter.setReplyChannel(replies());
   return chunkMessageChannelItemWriter;
}

与JMS连接的代码如下:

@Value("${broker.url}")
private String brokerUrl;


@Bean
public ActiveMQConnectionFactory jmsConnectionFactory() {
   ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
   connectionFactory.setBrokerURL(this.brokerUrl);
   connectionFactory.setTrustAllPackages(true);
   return connectionFactory;
}

/*
 * Configure outbound flow (requests going to workers)
 */

@Bean
public DirectChannel requests() {
   return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory jmsConnectionFactory) {
   return IntegrationFlows
         .from(requests())
         .handle(Jms.outboundAdapter(jmsConnectionFactory).destination("requests"))
         .get();
}

/*
 * Configure inbound flow (replies coming from workers)
 */

@Bean
public QueueChannel replies() {
   return new QueueChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory jmsConnectionFactory) {
   return IntegrationFlows
         .from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory).destination("replies"))
         .channel(replies())
         .get();
}

Application.properties:

broker.url=tcp://localhost:61616

spring.batch.initialize-schema=always

spring.datasource.url=jdbc:mysql://localhost:3306/mytest
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

从节点的Job如下:

/*
 * Configure worker components
 */

@Bean
public ItemProcessor<Integer, Integer> itemProcessor() {
   return item -> {
      System.out.println("processing item " + item);
      return item;
   };
}

@Bean
public ItemWriter<Integer> itemWriter() {
   return items -> {
      for (Integer item : items) {
         System.out.println("writing item " + item + System.currentTimeMillis());
      }
   };
}

@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
   ChunkProcessor<Integer> chunkProcessor = new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
   ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler = new ChunkProcessorChunkHandler<>();
   chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
   return chunkProcessorChunkHandler;
}

从节点连接JMS代码类似于主节点,自动实现了JMS的侦听器作用。从节点主要是在chunkProcessorChunkHandler这里做了处理和输出回到主节点。

本节源码: Github

Spring batch

原文  https://www.jdon.com/springboot/spring-batch-remote-chunk.html
正文到此结束
Loading...