如下图所示,生产者往broker61616发送消息,消费者通过broker61618接收消息。broker61616和broker61618通过networkConnectors连接。
Consumer的代码如下:
private static final String BROKEURL = "failover:(tcp://0.0.0.0:61618,tcp://0.0.0.0:61616)"; public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; Message message; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKEURL); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createQueue("test.failover"); // 创建一个消费者 consumer = session.createConsumer(destination); // 接收一个消息 while (null != (message = consumer.receive())) { System.out.println("consumer receive:" + ((TextMessage) message).getText()); Thread.sleep(1000); } } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { } }
Producer代码如下:
private static final String BROKEURL = "failover:(tcp://0.0.0.0:61616,tcp://0.0.0.0:61618)"; public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageProducer producer = null; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKEURL); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createQueue("test.failover"); // 创建一个生产者 producer = session.createProducer(destination); for (int i = 0; i < 10; i++) { // 创建一个消息 Message message = session.createTextMessage("this is test.failover:" + i); // 发送消息 producer.send(message); System.out.println(i); } } catch (JMSException e) { e.printStackTrace(); } finally { try { if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
启动两个ActiveMQ,61616和61618。producer往61616发送消息,consumer从61618接收数据。消息发送完后,把61618给关闭了,此时,consumer会报如下错误:
INFO | Successfully connected to tcp://0.0.0.0:61618 consumer receive:this is test.failover:0 consumer receive:this is test.failover:1 consumer receive:this is test.failover:2 WARN | Transport (tcp://0.0.0.0:61618) failed , attempting to automatically reconnect: {} java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:210) at java.net.SocketInputStream.read(SocketInputStream.java:141) at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50) at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:634) at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:59) at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:619) at java.io.DataInputStream.readInt(DataInputStream.java:387) at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) at java.lang.Thread.run(Thread.java:748) INFO | Successfully reconnected to tcp://0.0.0.0:61616
61618连接不了,最后一句可以看到重新连到61616,完成故障转移。此时重启61618,61618的几个消息被继续消费:
consumer receive:this is test.failover:3 consumer receive:this is test.failover:4 consumer receive:this is test.failover:5 consumer receive:this is test.failover:6 consumer receive:this is test.failover:7 consumer receive:this is test.failover:8 consumer receive:this is test.failover:9
本来想看看destinationPolicy的replayWhenNoConsumers属性,让消息从broker61618回流到broker61616,这边没设置,已经回流。我的版本是apache-activemq-5.15.12。
属性 | 默认值 | 描述 |
---|---|---|
initialReconnectDelay | 10 | 在第一次尝试重新连接之前等待的时间长度(毫秒) |
maxReconnectDelay | 30000 | 最长重连时间间隔(毫秒) |
useExponentialBackOff | true | 重连时间间隔是否以指数形式增长 |
backOffMultiplier | 2.0 | 递增倍数 |
maxReconnectAttempts | -1 | 自5.6版本开始,-1为默认值,代表不限重试次数,0标识从不重试(只尝试连接一次,并不重连),5.6以前的版本,0为默认值,代表不重试,如果设置大于0的数,则代表最大重试次数 |
startupMaxReconnectAttempts | 0 | 初始化时的最大重试次数 |
randomize | true | 使用随机连接,以达到负载均衡的目的 |
backup | false | 提前初始化一个未使用的链接,以便进行快速的失败转移 |
trackMessages | false | 设置是否缓存(故障发生时)尚未传送完成的消息,当broker一旦重新连接成功,便将这些缓存中的消息刷新到新连接的代理中,使得消息可以在broker切换前后顺利传送 |
maxCacheSize | 127*1024bytes | 当trackMessage启动时,缓存的最大字节 |
updateURISupported | true | 设定是否可以动态修改broker uri(自5.4版本开始) |