转载

【ActiveMQ中文指南】第二章

2.2 Transport ActiveMQ目前支持的transport有:VM Transport、TCP Transport、SSL Transport、Peer Transport、UDP Transport、Multicast Transport、HTTP and HTTPS Transport、Failover Transport、Fanout Transport、Discovery Transport、ZeroConf Transport等。以下简单介绍其中的几种,更多请参考Apache官方文档。   2.2.1 VM Transport VM transport允许在VM内部通信,从而避免了网络传输的开销。这时候采用的连接不是socket连接,而是直接地方法调用。 第一个创建VM 连接的客户会启动一个embed VM broker,接下来所有使用相同的broker name的VM连接都会使用这个broker。当这个broker上所有的连接都关闭的时候,这个broker也会自动关闭。 以下是配置语法: vm://brokerName?transportOptions 例如:vm://broker1?marshal=false&broker.persistent=false Transport Options的可选值如下:
Option Name Default Value Description
Marshal false If true, forces each command sent over the transport to be marshlled and unmarshlled using a WireFormat
wireFormat default The name of the WireFormat to use
wireFormat.* All the properties with this prefix are used to configure the wireFormat
create true If the broker should be created on demand if it does not allready exist. Only supported in ActiveMQ 4.1
broker.* All the properties with this prefix are used to configure the broker. See Configuring Wire Formats for more information
  以下是高级配置语法: vm:(broker:(tcp://localhost)?brokerOptions)?transportOptions vm:broker:(tcp://localhost)?brokerOptions 例如:vm:(broker:(tcp://localhost:6000)?persistent=false)?marshal=false Transport Options的可选值如下:
Option Name Default Value Description
marshal false If true, forces each command sent over the transport to be marshlled and unmarshlled using a WireFormat
wireFormat default The name of the WireFormat to use
wireFormat.* All the properties with this prefix are used to configure the wireFormat
  使用配置文件的配置语法: vm://localhost?brokerConfig=xbean:activemq.xml 例如:vm:// localhost?brokerConfig=xbean:com/test/activemq.xml   使用Spring的配置:
Xml代码  收藏代码
  1. <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
  2.   <property name="config" value="classpath:org/apache/activemq/xbean/activemq.xml" />
  3.   <property name="start" value="true" />
  4. </bean>
  5. <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker">
  6.   <property name="brokerURL" value="vm://localhost"/>
  7. </bean>
如果persistent是true,那么ActiveMQ会在当前目录下创建一个缺省值是activemq-data的目录用于持久化保存数据。需要注意的是,如果程序中启动了多个不同名字的VM broker,那么可能会有如下警告:Failed to start jmx connector: Cannot bind to URL [rmi://localhost:1099/jmxrmi]: javax.naming.NameAlreadyBoundException…可以通过在transportOptions中追加broker.useJmx=false来禁用JMX来避免这个警告。   2.2.2 TCP Transport TCP transport 允许客户端通过TCP socket连接到远程的broker。以下是配置语法: tcp://hostname:port?transportOptions Transport Options的可选值如下:
Option Name Default Value Description
minmumWireFormatVersion 0 The minimum version wireformat that is allowed
trace false Causes all commands that are sent over the transport to be logged
useLocalHost true When true, it causes the local machines name to resolve to "localhost".
socketBufferSize 64 * 1024 Sets the socket buffer size in bytes
soTimeout 0 sets the socket timeout in milliseconds
connectionTimeout 30000 A non-zero value specifies the connection timeout in milliseconds. A zero value means wait forever for the connection to be established. Negative values are ignored.
wireFormat default The name of the WireFormat to use
wireFormat.* All the properties with this prefix are used to configure the wireFormat. See Configuring Wire Formats for more information
例如:tcp://localhost:61616?trace=false   2.2.3 Failover Transport Failover Transport是一种重新连接的机制,它工作于其它transport的上层,用于建立可靠的传输。它的配置语法允许制定任意多个复合的URI。Failover transport会自动选择其中的一个URI来尝试建立连接。如果没有成功,那么会选择一个其它的URI来建立一个新的连接。以下是配置语法: failover:(uri1,...,uriN)?transportOptions failover:uri1,...,uriN Transport Options的可选值如下:
Option Name D efault Value Description
initialReconnectDelay 10 How long to wait before the first reconnect attempt (in ms)
maxReconnectDelay 30000 The maximum amount of time we ever wait between reconnect attempts (in ms)
useExponentialBackOff true Should an exponential backoff be used between reconnect attempts
backOffMultiplier 2 The exponent used in the exponential backoff attempts
maxReconnectAttempts 0 If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client
randomize true use a random algorithm to choose the URI to use for reconnect from the list provided
backup false initialize and hold a second transport connection - to enable fast failover
例如:failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100   2.2.4 Discovery transport Discovery transport是可靠的tranport。它使用Discovery transport来定位用来连接的URI列表。以下是配置语法: discovery:(discoveryAgentURI)?transportOptions discovery:discoveryAgentURI Transport Options的可选值如下:
Option Name Default Value Description
initialReconnectDelay 10 How long to wait before the first reconnect attempt
maxReconnectDelay 30000 The maximum amount of time we ever wait between reconnect attempts
useExponentialBackOff true Should an exponential backoff be used btween reconnect attempts
backOffMultiplier 2 The exponent used in the exponential backoff attempts
maxReconnectAttempts 0 If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client
例如:discovery:(multicast://default)?initialReconnectDelay=100 为了使用Discovery来发现broker,需要为broker启用discovery agent。 以下是XML配置文件中的一个例子:
Xml代码  收藏代码
  1. <broker name="foo">
  2.    <transportConnectors>
  3.       <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
  4.     </transportConnectors>
  5.     ...
  6. </broker>
在使用Failover Transport或Discovery transport等能够自动重连的transport的时候,需要注意的是:设想有两个broker,它们都启用AMQ Message Store作为持久化存储,有一个producer和一个consumer连接到某个queue。当因其中一个broker失效时而切换到另一个broker的时候,如果失效的broker的queue中还有未被consumer消费的消息,那么这个queue里的消息仍然滞留在失效broker的中,直到失效的broker被修复并重新切换回这个被修复的broker后,之前被保留的消息才会被consumer消费掉。如果被处理的消息有时序限制,那么应用程序就需要处理这个问题。另外也可以通过ActiveMQ集群来解决这个问题。 在transport重连的时候,可以在connection上注册TransportListener来获得回调,例如:
Java代码  收藏代码
  1. (ActiveMQConnection)connection).addTransportListener(new TransportListener() {
  2.     public void onCommand(Object cmd) {
  3.     }
  4.     public void onException(IOException exp) {
  5.     }
  6.     public void transportInterupted() {
  7.         // The transport has suffered an interruption from which it hopes to recover.
  8.     }
  9.     public void transportResumed() {
  10.         // The transport has resumed after an interruption.
  11.     }
  12. });
正文到此结束
Loading...