Spring Integration Java DSL已经融合到 Spring Integration Core 5.0 ,这是一个聪明而明显的举动,因为:
让我们看看基于ActiveMQ JMS的示例如何使用它。
Maven依赖:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jms</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-kahadb-store</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-java-dsl --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-java-dsl</artifactId> <version>1.2.3.RELEASE</version> </dependency> </dependencies>
示例1:Jms入站网关
我们有以下 ServiceActivator :
@Service public class ActiveMQEndpoint { @ServiceActivator(inputChannel = "inboundChannel") public void processMessage(final String inboundPayload) { System.out.println("Inbound message: "+inboundPayload); } }
如果您想使用SI Java DSL 将inboundPayload从Jms队列发送到 Gateway 风格的激活器,那么请使用DSL Jms 工厂:
@Bean public DynamicDestinationResolver dynamicDestinationResolver() { return new DynamicDestinationResolver(); } @Bean public ActiveMQConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory(); } @Bean public DefaultMessageListenerContainer listenerContainer() { final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer(); defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver()); defaultMessageListenerContainer.setConnectionFactory(connectionFactory()); defaultMessageListenerContainer.setDestinationName("jms.activeMQ.Test"); return defaultMessageListenerContainer; } @Bean public MessageChannel inboundChannel() { return MessageChannels.direct("inboundChannel").get(); } @Bean public JmsInboundGateway dataEndpoint() { return Jms.inboundGateway(listenerContainer()) .requestChannel(inboundChannel()).get(); }
通过dataEndpoint bean 返回 JmsInboundGatewaySpec ,您还可以向SI通道或Jms目标发送回复。查看文档。
示例2:Jms消息驱动的通道适配器
如果您正在寻找替换消息驱动通道适配器的XML JMS配置,那么 JmsMessageDrivenChannelAdapter 是一种适合您的方式:
@Bean public DynamicDestinationResolver dynamicDestinationResolver() { return new DynamicDestinationResolver(); } @Bean public ActiveMQConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory(); } @Bean public DefaultMessageListenerContainer listenerContainer() { final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer(); defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver()); defaultMessageListenerContainer.setConnectionFactory(connectionFactory()); defaultMessageListenerContainer.setDestinationName("jms.activeMQ.Test"); return defaultMessageListenerContainer; } @Bean public MessageChannel inboundChannel() { return MessageChannels.direct("inboundChannel").get(); } @Bean public JmsMessageDrivenChannelAdapter dataEndpoint() { final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener = new ChannelPublishingJmsMessageListener(); channelPublishingJmsMessageListener.setExpectReply(false); final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener ); messageDrivenChannelAdapter.setOutputChannel(inboundChannel()); return messageDrivenChannelAdapter; }
与前面的示例一样,入站有效负载如样本1中一样发送给激活器。
示例3:使用JAXB的Jms消息驱动的通道适配器
在典型的场景中,您希望通过Jms接受XML作为文本消息,将其转换为JAXB存根并在服务激活器中处理它。我将向您展示如何使用SI Java DSL执行此操作,但首先让我们为xml处理添加两个依赖项:
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-xml</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-oxm</artifactId> </dependency>
我们将通过JMS接受shiporders ,所以首先XSD命名为shiporder.xsd:
<?xml version="1.0" encoding="UTF-8" ?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"> <xs:element name="shiporder"> <xs:complexType> <xs:sequence> <xs:element name="orderperson" type="xs:string"/> <xs:element name="shipto"> <xs:complexType> <xs:sequence> <xs:element name="name" type="xs:string"/> <xs:element name="address" type="xs:string"/> <xs:element name="city" type="xs:string"/> <xs:element name="country" type="xs:string"/> </xs:sequence> </xs:complexType> </xs:element> <xs:element name="item" maxOccurs="unbounded"> <xs:complexType> <xs:sequence> <xs:element name="title" type="xs:string"/> <xs:element name="note" type="xs:string" minOccurs="0"/> <xs:element name="quantity" type="xs:positiveInteger"/> <xs:element name="price" type="xs:decimal"/> </xs:sequence> </xs:complexType> </xs:element> </xs:sequence> <xs:attribute name="orderid" type="xs:string" use="required"/> </xs:complexType> </xs:element> </xs:schema>
新增JAXB maven plugin 生成JAXB存根:
<plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>jaxb2-maven-plugin</artifactId> <version>2.3.1</version> <executions> <execution> <id>xjc-schema1</id> <goals> <goal>xjc</goal> </goals> <configuration> <!-- Use all XSDs under the west directory for sources here. --> <sources> <source>src/main/resources/xsds/shiporder.xsd</source> </sources> <!-- Package name of the generated sources. --> <packageName>com.example.stubs</packageName> <outputDirectory>src/main/java</outputDirectory> <clearOutputDir>false</clearOutputDir> </configuration> </execution> </executions> </plugin>
我们已经准备好了存根类和一切,现在使用Jaxb magic的Java DSL JMS消息驱动适配器:
/** * Sample 3: Jms message driven adapter with JAXB */<font> @Bean <b>public</b> JmsMessageDrivenChannelAdapter dataEndpoint() { <b>final</b> ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener = <b>new</b> ChannelPublishingJmsMessageListener(); channelPublishingJmsMessageListener.setExpectReply(false); channelPublishingJmsMessageListener.setMessageConverter(<b>new</b> MarshallingMessageConverter(shipOrdersMarshaller())); <b>final</b> JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = <b>new</b> JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener ); messageDrivenChannelAdapter.setOutputChannel(inboundChannel()); <b>return</b> messageDrivenChannelAdapter; } @Bean <b>public</b> Jaxb2Marshaller shipOrdersMarshaller() { Jaxb2Marshaller marshaller = <b>new</b> Jaxb2Marshaller(); marshaller.setContextPath(</font><font>"com.example.stubs"</font><font>); <b>return</b> marshaller; } </font>
XML配置在Java中使用它可以为您提供如此强大的功能和灵活性。要完成此示例,inboundChannel的服务激活器将如下所示:
<font><i>/** * Sample 3 * @param shiporder */</i></font><font> @ServiceActivator(inputChannel = </font><font>"inboundChannel"</font><font>) <b>public</b> <b>void</b> processMessage(<b>final</b> Shiporder shiporder) { System.out.println(shiporder.getOrderid()); System.out.println(shiporder.getOrderperson()); } </font>
要测试流,您可以使用以下XML通过JConsole发送到JMS队列:
<?xml version=<font>"1.0"</font><font> encoding=</font><font>"UTF-8"</font><font>?> <shiporder orderid=</font><font>"889923"</font><font> xmlns:xsi=</font><font>"http://www.w3.org/2001/XMLSchema-instance"</font><font> xsi:noNamespaceSchemaLocation=</font><font>"shiporder.xsd"</font><font>> <orderperson>John Smith</orderperson> <shipto> <name>Ola Nordmann</name> <address>Langgt 23</address> <city>4000 Stavanger</city> <country>Norway</country> </shipto> <item> <title>Empire Burlesque</title> <note>Special Edition</note> <quantity>1</quantity> <price>10.90</price> </item> <item> <title>Hide your heart</title> <quantity>1</quantity> <price>9.90</price> </item> </shiporder> </font>
有关如何使用ActiveMQ和JConsole的快速概述,请查看本 教程
示例4:具有JAXB和有效负载根路由的Jms消息驱动的通道适配器
另一种典型情况是接受XML作为JMS文本消息,将其转换为JAXB存根并根据有效负载根类型将有效负载路由到某个服务激活器。当然SI Java DSL支持所有类型的路由,我将向您展示如何根据有效载荷类型进行路由。
首先,将以下XSD添加到shiporder.xsd所在的文件夹中,并将其命名为purchaseorder.xsd:
<xsd:schema xmlns:xsd=<font>"http://www.w3.org/2001/XMLSchema"</font><font> xmlns:tns=</font><font>"http://tempuri.org/PurchaseOrderSchema.xsd"</font><font> targetNamespace=</font><font>"http://tempuri.org/PurchaseOrderSchema.xsd"</font><font> elementFormDefault=</font><font>"qualified"</font><font>> <xsd:element name=</font><font>"PurchaseOrder"</font><font>> <xsd:complexType> <xsd:sequence> <xsd:element name=</font><font>"ShipTo"</font><font> type=</font><font>"tns:USAddress"</font><font> maxOccurs=</font><font>"2"</font><font>/> <xsd:element name=</font><font>"BillTo"</font><font> type=</font><font>"tns:USAddress"</font><font>/> </xsd:sequence> <xsd:attribute name=</font><font>"OrderDate"</font><font> type=</font><font>"xsd:date"</font><font>/> </xsd:complexType> </xsd:element> <xsd:complexType name=</font><font>"USAddress"</font><font>> <xsd:sequence> <xsd:element name=</font><font>"name"</font><font> type=</font><font>"xsd:string"</font><font>/> <xsd:element name=</font><font>"street"</font><font> type=</font><font>"xsd:string"</font><font>/> <xsd:element name=</font><font>"city"</font><font> type=</font><font>"xsd:string"</font><font>/> <xsd:element name=</font><font>"state"</font><font> type=</font><font>"xsd:string"</font><font>/> <xsd:element name=</font><font>"zip"</font><font> type=</font><font>"xsd:integer"</font><font>/> </xsd:sequence> <xsd:attribute name=</font><font>"country"</font><font> type=</font><font>"xsd:NMTOKEN"</font><font> fixed=</font><font>"US"</font><font>/> </xsd:complexType> </xsd:schema> </font>
然后添加到jaxb maven插件配置:
<plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>jaxb2-maven-plugin</artifactId> <version>2.3.1</version> <executions> <execution> <id>xjc-schema1</id> <goals> <goal>xjc</goal> </goals> <configuration> <!-- Use all XSDs under the west directory <b>for</b> sources here. --> <sources> <source>src/main/resources/xsds/shiporder.xsd</source> <source>src/main/resources/xsds/purchaseorder.xsd</source> </sources> <!-- Package name of the generated sources. --> <packageName>com.example.stubs</packageName> <outputDirectory>src/main/java</outputDirectory> <clearOutputDir>false</clearOutputDir> </configuration> </execution> </executions> </plugin>
运行mvn clean install以生成新XSD的JAXB存根。现在承诺有效负载根映射:
@Bean <b>public</b> Jaxb2Marshaller ordersMarshaller() { Jaxb2Marshaller marshaller = <b>new</b> Jaxb2Marshaller(); marshaller.setContextPath(<font>"com.example.stubs"</font><font>); <b>return</b> marshaller; } </font><font><i>/** * Sample 4: Jms message driven adapter with Jaxb and Payload routing. * @return */</i></font><font> @Bean <b>public</b> JmsMessageDrivenChannelAdapter dataEndpoint() { <b>final</b> ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener = <b>new</b> ChannelPublishingJmsMessageListener(); channelPublishingJmsMessageListener.setMessageConverter(<b>new</b> MarshallingMessageConverter(ordersMarshaller())); <b>final</b> JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = <b>new</b> JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener ); messageDrivenChannelAdapter.setOutputChannel(inboundChannel()); <b>return</b> messageDrivenChannelAdapter; } @Bean <b>public</b> IntegrationFlow payloadRootMapping() { <b>return</b> IntegrationFlows.from(inboundChannel()).<Object, Class<?>>route(Object::getClass, m->m .subFlowMapping(Shiporder.<b>class</b>, sf->sf.handle((MessageHandler) message -> { <b>final</b> Shiporder shiporder = (Shiporder) message.getPayload(); System.out.println(shiporder.getOrderperson()); System.out.println(shiporder.getOrderid()); })) .subFlowMapping(PurchaseOrder.<b>class</b>, sf->sf.handle((MessageHandler) message -> { <b>final</b> PurchaseOrder purchaseOrderType = (PurchaseOrder) message.getPayload(); System.out.println(purchaseOrderType.getBillTo().getName()); })) ).get(); } </font>
注意payloadRootMapping bean,让我们解释一下重要的部分:
要测试ShipOrder有效负载,请使用示例3中的XML,以测试PurchaseOrder有效负载,使用以下XML:
<?xml version=<font>"1.0"</font><font> encoding=</font><font>"utf-8"</font><font>?> <PurchaseOrder OrderDate=</font><font>"1900-01-01"</font><font> xmlns=</font><font>"http://tempuri.org/PurchaseOrderSchema.xsd"</font><font>> <ShipTo country=</font><font>"US"</font><font>> <name>name1</name> <street>street1</street> <city>city1</city> <state>state1</state> <zip>1</zip> </ShipTo> <ShipTo country=</font><font>"US"</font><font>> <name>name2</name> <street>street2</street> <city>city2</city> <state>state2</state> <zip>-79228162514264337593543950335</zip> </ShipTo> <BillTo country=</font><font>"US"</font><font>> <name>name1</name> <street>street1</street> <city>city1</city> <state>state1</state> <zip>1</zip> </BillTo> </PurchaseOrder> </font>
应根据subflow 子流Map路由两个有效载荷。
示例5:IntegrationFlowAdapter
除了企业集成模式的其他实现(check them out )),我需要提到IntegrationFlowAdapter。通过扩展此类并实现buildFlow方法,如:
[url=https:<font><i>//bitbucket.org/Component/]@Component[/url] </i></font><font> <b>public</b> <b>class</b> MyFlowAdapter <b>extends</b> IntegrationFlowAdapter { @Autowired <b>private</b> ConnectionFactory rabbitConnectionFactory; @Override <b>protected</b> IntegrationFlowDefinition<?> buildFlow() { <b>return</b> from(Amqp.inboundAdapter(<b>this</b>.rabbitConnectionFactory, </font><font>"myQueue"</font><font>)) .<String, String>transform(String::toLowerCase) .channel(c -> c.queue(</font><font>"myFlowAdapterOutput"</font><font>)); } </font>
你可以将bean的重复声明包装成一个组件并给它们所需的流量。然后可以配置这样的组件并将其作为一个类实例提供给调用代码!
因此,让我们举例说明这个repo中的示例3更短一些,并为所有JmsEndpoints定义基类,并在其中定义重复bean:
<b>public</b> <b>class</b> JmsEndpoint <b>extends</b> IntegrationFlowAdapter { <b>private</b> String queueName; <b>private</b> String channelName; <b>private</b> String contextPath; <font><i>/** * @param queueName * @param channelName * @param contextPath */</i></font><font> <b>public</b> JmsEndpoint(String queueName, String channelName, String contextPath) { <b>this</b>.queueName = queueName; <b>this</b>.channelName = channelName; <b>this</b>.contextPath = contextPath; } @Override <b>protected</b> IntegrationFlowDefinition<?> buildFlow() { <b>return</b> from(Jms.messageDrivenChannelAdapter(listenerContainer()) .jmsMessageConverter(<b>new</b> MarshallingMessageConverter(shipOrdersMarshaller())) ).channel(channelName); } @Bean <b>public</b> Jaxb2Marshaller shipOrdersMarshaller() { Jaxb2Marshaller marshaller = <b>new</b> Jaxb2Marshaller(); marshaller.setContextPath(contextPath); <b>return</b> marshaller; } @Bean <b>public</b> DynamicDestinationResolver dynamicDestinationResolver() { <b>return</b> <b>new</b> DynamicDestinationResolver(); } @Bean <b>public</b> ActiveMQConnectionFactory connectionFactory() { <b>return</b> <b>new</b> ActiveMQConnectionFactory(); } @Bean <b>public</b> DefaultMessageListenerContainer listenerContainer() { <b>final</b> DefaultMessageListenerContainer defaultMessageListenerContainer = <b>new</b> DefaultMessageListenerContainer(); defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver()); defaultMessageListenerContainer.setConnectionFactory(connectionFactory()); defaultMessageListenerContainer.setDestinationName(queueName); <b>return</b> defaultMessageListenerContainer; } @Bean <b>public</b> MessageChannel inboundChannel() { <b>return</b> MessageChannels.direct(channelName).get(); } } </font>
现在声明特定队列的Jms端点很容易:
@Bean <b>public</b> JmsEndpoint jmsEndpoint() { <b>return</b> <b>new</b> JmsEndpoint(<font>"jms.activeMQ.Test"</font><font>, </font><font>"inboundChannel"</font><font>, </font><font>"com.example.stubs"</font><font>); } </font>
inboundChannel的服务激活器:
<font><i>/** * Sample 3, 5 * @param shiporder */</i></font><font> @ServiceActivator(inputChannel = </font><font>"inboundChannel"</font><font>) <b>public</b> <b>void</b> processMessage(<b>final</b> Shiporder shiporder) { System.out.println(shiporder.getOrderid()); System.out.println(shiporder.getOrderperson()); } </font>
您不应该错过在项目中使用IntegrationFlowAdapter。我喜欢它的概念。
我最近在 Embedit 的新的基于Spring Boot的项目中开始使用Spring Integration Java DSL 。即使有一些配置,我发现它非常有用。
点击标题见原文, 源码