转载

SPRING BATCH remote chunking模式下可同时处理多文件

SPRING BATCH remote chunking模式下,如果要同一时间处理多个文件,按DEMO的默认配置,是会报错的,这是由于多个文件的处理的MASTER方,是用同一个QUEUE名,这样SLAVE中处理多个JOB INSTANCE时,会返回不同的JOB-INSTANCE-ID,导致报错。

这时需更改SPRING BATCH使用SPRING INTEGRATION的模式中的GATEWAY组件。

GATEWAY组件是工作在REQUEST/RESPONSE模式下,即发一个MESSAGE到某一QUEUE时,要从REPLY QUEUE等到CONSUMER返回结果时,才往下继续。

OUTBOUND GATEWAY:从某一CHANNEL获取MESSAGE,发往REQUEST QUEUE,从REPLY QUEUE等到CONSUMER返回结果,将此MESSAGE发往下一CHANNEL。

INBOUND GATEWAY:从某一QUEUE获取MESSAGE,发往某一REQUEST CHANNEL,从REPLY CHANNEL等到返回结果,将此MESSAGE发往下一QUEUE。

详情参见此文: https://blog.csdn.net/alexlau8/article/details/78056064 。

<!--  Master jms  -->

< int:channel  id ="MasterRequestChannel" >

< int:dispatcher  task-executor ="RequestPublishExecutor" />

</ int:channel >

< task:executor  id ="RequestPublishExecutor"  pool-size ="5-10"  queue-capacity ="0" />

<!--

<int-jms:outbound-channel-adapter 

connection-factory="connectionFactory" 

destination-name="RequestQueue" 

-->

< int:channel  id ="MasterReplyChannel" />

<!--

<int-jms:message-driven-channel-adapter 

connection-factory="connectionFactory" 

destination-name="ReplyQueue"

-->
<

int-jms:outbound-gateway

connection-factory ="connectionFactory"
="JMSCorrelationID"
="MasterRequestChannel"
="RequestQueue"
="30000"
="MasterReplyChannel"
="ReplyQueue"
="true" >

< int-jms:reply-listener  />

</ int-jms:outbound-gateway >

<!--  Slave jms  -->

< int:channel  id ="SlaveRequestChannel" />

<!--

<int-jms:message-driven-channel-adapter

connection-factory="connectionFactory" 

destination-name="RequestQueue"

-->

< int:channel  id ="SlaveReplyChannel" />

<!--

<int-jms:outbound-channel-adapter 

connection-factory="connectionFactory" 

destination-name="ReplyQueue"

-->
<

int-jms:inbound-gateway

connection-factory ="connectionFactory"         correlation-key ="JMSCorrelationID"         request-channel ="SlaveRequestChannel"         request-destination-name ="RequestQueue"         reply-channel ="SlaveReplyChannel"         default-reply-queue-name ="ReplyQueue"

/>

MASTER配置

package com.paul.testspringbatch.config.master;

import javax.jms.ConnectionFactory;

import org.springframework.beans.factory.config.CustomScopeConfigurer;

// import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.context.annotation.Profile;

import org.springframework.context.annotation.Scope;

import org.springframework.context.support.SimpleThreadScope;

import org.springframework.integration.channel.DirectChannel;

import org.springframework.integration.channel.QueueChannel;

import org.springframework.integration.config.EnableIntegration;

import org.springframework.integration.dsl.IntegrationFlow;

import org.springframework.integration.dsl.IntegrationFlows;

import org.springframework.integration.jms.JmsOutboundGateway;

import com.paul.testspringbatch.common.constant.IntegrationConstant;

@Configuration

@EnableIntegration

@Profile("batch-master")

public class IntegrationMasterConfiguration {

//

@Value("${broker.url}")

//

private String brokerUrl;

//

@Bean

//

public ActiveMQConnectionFactory connectionFactory() {

//

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();

//

connectionFactory.setBrokerURL(this.brokerUrl);

//

connectionFactory.setTrustAllPackages(true);

//

return connectionFactory;

//     }
/*

* Configure outbound flow (requests going to workers)

*/

@Bean

//     @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public DirectChannel requests() {

return new DirectChannel();

}

//

@Bean

//

public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory) {

//

return IntegrationFlows

//

.from(requests())

//

.handle(Jms.outboundAdapter(connectionFactory).destination(IntegrationConstant.MASTER_REQUEST_DESTINATION))

//

.get();

//     }

@Bean

public CustomScopeConfigurer customScopeConfigurer() {

CustomScopeConfigurer customScopeConfigurer =  new CustomScopeConfigurer();

customScopeConfigurer.addScope("thread",  new SimpleThreadScope());

return customScopeConfigurer;

}

//

@Bean

//

public static BeanFactoryPostProcessor beanFactoryPostProcessor() {

//

return new BeanFactoryPostProcessor() {

//
//

@Override

//

public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

//

beanFactory.registerScope("thread", new SimpleThreadScope());

//

}

//

};

//      }
/*

* Configure inbound flow (replies coming from workers)

*/

@Bean

@Scope(value = "thread" /*  , proxyMode = ScopedProxyMode.NO  */ )

//     @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public QueueChannel replies() {

return new QueueChannel();

}

//

@Bean

//

public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {

//

return IntegrationFlows

//

.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination(IntegrationConstant.MASTER_REPLY_DESTINATION))

//

.channel(replies())

//

.get();

//     }

@Bean

public JmsOutboundGateway jmsOutboundGateway(ConnectionFactory connectionFactory) {

JmsOutboundGateway jmsOutboundGateway =  new JmsOutboundGateway();

jmsOutboundGateway.setConnectionFactory(connectionFactory);

jmsOutboundGateway.setRequestDestinationName(IntegrationConstant.MASTER_REQUEST_DESTINATION); // 2. send the message to this destination
jmsOutboundGateway.setRequiresReply( true );

jmsOutboundGateway.setCorrelationKey(IntegrationConstant.JMS_CORRELATION_KEY); // 3. let the broker filter the message
jmsOutboundGateway.setAsync( true ); // must be async, so that JMS_CORRELATION_KEY work
jmsOutboundGateway.setUseReplyContainer( true );

jmsOutboundGateway.setReplyDestinationName(IntegrationConstant.MASTER_REPLY_DESTINATION); // 4. waiting the response from this destination
jmsOutboundGateway.setReceiveTimeout(30_000);

return jmsOutboundGateway;

}

@Bean

public IntegrationFlow jmsOutboundGatewayFlow(ConnectionFactory connectionFactory) {

return IntegrationFlows

.from(requests()) // 1. receive message from this channel
.handle(jmsOutboundGateway(connectionFactory))

.channel(replies()) // 5. send back the response to this channel
.get();

}

}

SLAVE配置:

package com.paul.testspringbatch.config.slave;

import javax.jms.ConnectionFactory;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.context.annotation.Profile;

import org.springframework.integration.channel.DirectChannel;

import org.springframework.integration.config.EnableIntegration;

import org.springframework.integration.dsl.IntegrationFlow;

import org.springframework.integration.dsl.IntegrationFlows;

import org.springframework.integration.jms.dsl.Jms;

import com.paul.testspringbatch.common.constant.IntegrationConstant;

@Configuration

@EnableIntegration

@Profile("batch-slave")

public class IntegrationSlaveConfiguration {

/*

* Configure inbound flow (requests coming from the master)

*/

@Bean

public DirectChannel requests() {

return new DirectChannel();

}

//

@Bean

//

public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {

//

return IntegrationFlows

//

.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))

//

.channel(requests())

//

.get();

//     }
/*

* Configure outbound flow (replies going to the master)

*/

@Bean

public DirectChannel replies() {

return new DirectChannel();

}

//

@Bean

//

public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory) {

//

return IntegrationFlows

//

.from(replies())

//

.handle(Jms.outboundAdapter(connectionFactory).destination("replies"))

//

.get();

//     }

@Bean

public IntegrationFlow inboundGatewayFlow(ConnectionFactory connectionFactory) {

return IntegrationFlows

.from(Jms

.inboundGateway(connectionFactory)

.destination(IntegrationConstant.SLAVE_HANDLE_MASTER_REQUEST_DESTINATION) // 1. receive message from this channel.
.correlationKey(IntegrationConstant.JMS_CORRELATION_KEY) // 2. let the broker filter the message
.requestChannel(requests()) // 3. send the message to this channel
.replyChannel(replies()) // 4. waitting the result from this channel
.defaultReplyQueueName(IntegrationConstant.SLAVE_RETURN_RESULT_DESTINATION) // 5.send back the result to this destination to the master.
)

.get();

}

}

原文  http://www.blogjava.net/paulwong/archive/2019/07/16/434210.html
正文到此结束
Loading...