2.2 Transport
ActiveMQ目前支持的transport有:VM Transport、TCP Transport、SSL Transport、Peer Transport、UDP Transport、Multicast Transport、HTTP and HTTPS Transport、Failover Transport、Fanout Transport、Discovery Transport、ZeroConf Transport等。以下简单介绍其中的几种,更多请参考Apache官方文档。
2.2.1 VM Transport
VM transport允许在VM内部通信,从而避免了网络传输的开销。这时候采用的连接不是socket连接,而是直接地方法调用。 第一个创建VM 连接的客户会启动一个embed VM broker,接下来所有使用相同的broker name的VM连接都会使用这个broker。当这个broker上所有的连接都关闭的时候,这个broker也会自动关闭。
以下是配置语法:
vm://brokerName?transportOptions
例如:vm://broker1?marshal=false&broker.persistent=false
Transport Options的可选值如下:
Option Name |
Default Value |
Description |
Marshal |
false |
If true, forces each command sent over the transport to be marshlled and unmarshlled using a WireFormat |
wireFormat |
default |
The name of the WireFormat to use |
wireFormat.* |
|
All the properties with this prefix are used to configure the wireFormat |
create |
true |
If the broker should be created on demand if it does not allready exist. Only supported in ActiveMQ 4.1 |
broker.* |
|
All the properties with this prefix are used to configure the broker. See Configuring Wire Formats for more information |
以下是高级配置语法:
vm:(broker:(tcp://localhost)?brokerOptions)?transportOptions
vm:broker:(tcp://localhost)?brokerOptions
例如:vm:(broker:(tcp://localhost:6000)?persistent=false)?marshal=false
Transport Options的可选值如下:
Option Name |
Default Value |
Description |
marshal |
false |
If true, forces each command sent over the transport to be marshlled and unmarshlled using a WireFormat |
wireFormat |
default |
The name of the WireFormat to use |
wireFormat.* |
|
All the properties with this prefix are used to configure the wireFormat |
使用配置文件的配置语法:
vm://localhost?brokerConfig=xbean:activemq.xml
例如:vm:// localhost?brokerConfig=xbean:com/test/activemq.xml
使用Spring的配置:
- <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
- <property name="config" value="classpath:org/apache/activemq/xbean/activemq.xml" />
- <property name="start" value="true" />
- </bean>
- <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker">
- <property name="brokerURL" value="vm://localhost"/>
- </bean>
如果persistent是true,那么ActiveMQ会在当前目录下创建一个缺省值是activemq-data的目录用于持久化保存数据。需要注意的是,如果程序中启动了多个不同名字的VM broker,那么可能会有如下警告:Failed to start jmx connector: Cannot bind to URL [rmi://localhost:1099/jmxrmi]: javax.naming.NameAlreadyBoundException…可以通过在transportOptions中追加broker.useJmx=false来禁用JMX来避免这个警告。
2.2.2 TCP Transport
TCP transport 允许客户端通过TCP socket连接到远程的broker。以下是配置语法:
tcp://hostname:port?transportOptions
Transport Options的可选值如下:
Option Name |
Default Value |
Description |
minmumWireFormatVersion |
0 |
The minimum version wireformat that is allowed |
trace |
false |
Causes all commands that are sent over the transport to be logged |
useLocalHost |
true |
When true, it causes the local machines name to resolve to "localhost". |
socketBufferSize |
64 * 1024 |
Sets the socket buffer size in bytes |
soTimeout |
0 |
sets the socket timeout in milliseconds |
connectionTimeout |
30000 |
A non-zero value specifies the connection timeout in milliseconds. A zero value means wait forever for the connection to be established. Negative values are ignored. |
wireFormat |
default |
The name of the WireFormat to use |
wireFormat.* |
|
All the properties with this prefix are used to configure the wireFormat. See Configuring Wire Formats for more information |
例如:tcp://localhost:61616?trace=false
2.2.3 Failover Transport
Failover Transport是一种重新连接的机制,它工作于其它transport的上层,用于建立可靠的传输。它的配置语法允许制定任意多个复合的URI。Failover transport会自动选择其中的一个URI来尝试建立连接。如果没有成功,那么会选择一个其它的URI来建立一个新的连接。以下是配置语法:
failover:(uri1,...,uriN)?transportOptions
failover:uri1,...,uriN
Transport Options的可选值如下:
Option Name |
D efault Value |
Description |
initialReconnectDelay |
10 |
How long to wait before the first reconnect attempt (in ms) |
maxReconnectDelay |
30000 |
The maximum amount of time we ever wait between reconnect attempts (in ms) |
useExponentialBackOff |
true |
Should an exponential backoff be used between reconnect attempts |
backOffMultiplier |
2 |
The exponent used in the exponential backoff attempts |
maxReconnectAttempts |
0 |
If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client |
randomize |
true |
use a random algorithm to choose the URI to use for reconnect from the list provided |
backup |
false |
initialize and hold a second transport connection - to enable fast failover |
例如:failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100
2.2.4 Discovery transport
Discovery transport是可靠的tranport。它使用Discovery transport来定位用来连接的URI列表。以下是配置语法:
discovery:(discoveryAgentURI)?transportOptions
discovery:discoveryAgentURI
Transport Options的可选值如下:
Option Name |
Default Value |
Description |
initialReconnectDelay |
10 |
How long to wait before the first reconnect attempt |
maxReconnectDelay |
30000 |
The maximum amount of time we ever wait between reconnect attempts |
useExponentialBackOff |
true |
Should an exponential backoff be used btween reconnect attempts |
backOffMultiplier |
2 |
The exponent used in the exponential backoff attempts |
maxReconnectAttempts |
0 |
If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client |
例如:discovery:(multicast://default)?initialReconnectDelay=100
为了使用Discovery来发现broker,需要为broker启用discovery agent。 以下是XML配置文件中的一个例子:
- <broker name="foo">
- <transportConnectors>
- <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
- </transportConnectors>
- ...
- </broker>
在使用Failover Transport或Discovery transport等能够自动重连的transport的时候,需要注意的是:设想有两个broker,它们都启用AMQ Message Store作为持久化存储,有一个producer和一个consumer连接到某个queue。当因其中一个broker失效时而切换到另一个broker的时候,如果失效的broker的queue中还有未被consumer消费的消息,那么这个queue里的消息仍然滞留在失效broker的中,直到失效的broker被修复并重新切换回这个被修复的broker后,之前被保留的消息才会被consumer消费掉。如果被处理的消息有时序限制,那么应用程序就需要处理这个问题。另外也可以通过ActiveMQ集群来解决这个问题。
在transport重连的时候,可以在connection上注册TransportListener来获得回调,例如:
- (ActiveMQConnection)connection).addTransportListener(new TransportListener() {
- public void onCommand(Object cmd) {
- }
- public void onException(IOException exp) {
- }
- public void transportInterupted() {
-
- }
- public void transportResumed() {
-
- }
- });