SPRING BATCH remote chunking模式下,如果要同一时间处理多个文件,按DEMO的默认配置,是会报错的,这是由于多个文件的处理的MASTER方,是用同一个QUEUE名,这样SLAVE中处理多个JOB INSTANCE时,会返回不同的JOB-INSTANCE-ID,导致报错。
这时需更改SPRING BATCH使用SPRING INTEGRATION的模式中的GATEWAY组件。
GATEWAY组件是工作在REQUEST/RESPONSE模式下,即发一个MESSAGE到某一QUEUE时,要从REPLY QUEUE等到CONSUMER返回结果时,才往下继续。
OUTBOUND GATEWAY:从某一CHANNEL获取MESSAGE,发往REQUEST QUEUE,从REPLY QUEUE等到CONSUMER返回结果,将此MESSAGE发往下一CHANNEL。
INBOUND GATEWAY:从某一QUEUE获取MESSAGE,发往某一REQUEST CHANNEL,从REPLY CHANNEL等到返回结果,将此MESSAGE发往下一QUEUE。
详情参见此文: https://blog.csdn.net/alexlau8/article/details/78056064 。
<!-- Master jms -->
< int:channel id ="MasterRequestChannel" >
< int:dispatcher task-executor ="RequestPublishExecutor" />
</ int:channel >
< task:executor id ="RequestPublishExecutor" pool-size ="5-10" queue-capacity ="0" />
<int-jms:outbound-channel-adapter
connection-factory="connectionFactory"
destination-name="RequestQueue"
-->< int:channel id ="MasterReplyChannel" />
<int-jms:message-driven-channel-adapter
connection-factory="connectionFactory"
destination-name="ReplyQueue"
-->int-jms:outbound-gateway
connection-factory ="connectionFactory"< int-jms:reply-listener />
</ int-jms:outbound-gateway >
<!-- Slave jms -->
< int:channel id ="SlaveRequestChannel" />
<int-jms:message-driven-channel-adapter
connection-factory="connectionFactory"
destination-name="RequestQueue"
-->< int:channel id ="SlaveReplyChannel" />
<int-jms:outbound-channel-adapter
connection-factory="connectionFactory"
destination-name="ReplyQueue"
-->int-jms:inbound-gateway
connection-factory ="connectionFactory" correlation-key ="JMSCorrelationID" request-channel ="SlaveRequestChannel" request-destination-name ="RequestQueue" reply-channel ="SlaveReplyChannel" default-reply-queue-name ="ReplyQueue"/>
MASTER配置
package com.paul.testspringbatch.config.master;
import javax.jms.ConnectionFactory;
import org.springframework.beans.factory.config.CustomScopeConfigurer;
//
import org.springframework.batch.core.configuration.annotation.StepScope;
import
org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.context.annotation.Scope;
import org.springframework.context.support.SimpleThreadScope;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.JmsOutboundGateway;
import com.paul.testspringbatch.common.constant.IntegrationConstant;
@Configuration
@EnableIntegration
@Profile("batch-master")
public class IntegrationMasterConfiguration {
@Value("${broker.url}")
//private String brokerUrl;
//@Bean
//public ActiveMQConnectionFactory connectionFactory() {
//ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
//connectionFactory.setBrokerURL(this.brokerUrl);
//connectionFactory.setTrustAllPackages(true);
//return connectionFactory;
// }* Configure outbound flow (requests going to workers)
*/@Bean
//
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public
DirectChannel requests() {
return new DirectChannel();
}
@Bean
//public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory) {
//return IntegrationFlows
//.from(requests())
//.handle(Jms.outboundAdapter(connectionFactory).destination(IntegrationConstant.MASTER_REQUEST_DESTINATION))
//.get();
// }@Bean
public CustomScopeConfigurer customScopeConfigurer() {
CustomScopeConfigurer customScopeConfigurer = new CustomScopeConfigurer();
customScopeConfigurer.addScope("thread", new SimpleThreadScope());
return customScopeConfigurer;
}
@Bean
//public static BeanFactoryPostProcessor beanFactoryPostProcessor() {
//return new BeanFactoryPostProcessor() {
//@Override
//public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
//beanFactory.registerScope("thread", new SimpleThreadScope());
//}
//};
// }* Configure inbound flow (replies coming from workers)
*/@Bean
@Scope(value = "thread" /* , proxyMode = ScopedProxyMode.NO */ )
//
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public
QueueChannel replies() {
return new QueueChannel();
}
@Bean
//public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
//return IntegrationFlows
//.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination(IntegrationConstant.MASTER_REPLY_DESTINATION))
//.channel(replies())
//.get();
// }@Bean
public JmsOutboundGateway jmsOutboundGateway(ConnectionFactory connectionFactory) {
JmsOutboundGateway jmsOutboundGateway = new JmsOutboundGateway();
jmsOutboundGateway.setConnectionFactory(connectionFactory);
jmsOutboundGateway.setRequestDestinationName(IntegrationConstant.MASTER_REQUEST_DESTINATION); //
2. send the message to this destination
jmsOutboundGateway.setRequiresReply( true
);
jmsOutboundGateway.setCorrelationKey(IntegrationConstant.JMS_CORRELATION_KEY); //
3. let the broker filter the message
jmsOutboundGateway.setAsync( true
); //
must be async, so that JMS_CORRELATION_KEY work
jmsOutboundGateway.setUseReplyContainer( true
);
jmsOutboundGateway.setReplyDestinationName(IntegrationConstant.MASTER_REPLY_DESTINATION); //
4. waiting the response from this destination
jmsOutboundGateway.setReceiveTimeout(30_000);
return jmsOutboundGateway;
}
@Bean
public IntegrationFlow jmsOutboundGatewayFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from(requests()) //
1. receive message from this channel
.handle(jmsOutboundGateway(connectionFactory))
.channel(replies()) //
5. send back the response to this channel
.get();
}
}
SLAVE配置:
package com.paul.testspringbatch.config.slave;
import javax.jms.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;
import com.paul.testspringbatch.common.constant.IntegrationConstant;
@Configuration
@EnableIntegration
@Profile("batch-slave")
public class IntegrationSlaveConfiguration {
* Configure inbound flow (requests coming from the master)
*/@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
//public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
//return IntegrationFlows
//.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
//.channel(requests())
//.get();
// }* Configure outbound flow (replies going to the master)
*/@Bean
public DirectChannel replies() {
return new DirectChannel();
}
@Bean
//public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory) {
//return IntegrationFlows
//.from(replies())
//.handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
//.get();
// }@Bean
public IntegrationFlow inboundGatewayFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Jms
.inboundGateway(connectionFactory)
.destination(IntegrationConstant.SLAVE_HANDLE_MASTER_REQUEST_DESTINATION) //
1. receive message from this channel.
.correlationKey(IntegrationConstant.JMS_CORRELATION_KEY) //
2. let the broker filter the message
.requestChannel(requests()) //
3. send the message to this channel
.replyChannel(replies()) //
4. waitting the result from this channel
.defaultReplyQueueName(IntegrationConstant.SLAVE_RETURN_RESULT_DESTINATION) //
5.send back the result to this destination to the master.
)
.get();
}
}