ActiveMQ - 基本概念提到了消息的几种类型,包括TextMessage、StreamMessage、ObjectMessage、MapMessage、BytesMessage、BlobMessage。属性包括boolean、byte、double、float、int、long、object、short、string。下面看看在代码中是怎么发送和接收的。
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; MessageProducer producer = null; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue Destination destination = session.createQueue("test.message"); // 创建一个生产者 producer = session.createProducer(destination); // 属性设置 sendProperty(session, producer); // 各种消息发送 sendTextMessage(session, producer); sendStreamMessage(session, producer); sendObjectMessage(session, producer); sendMapMessage(session, producer); sendBytesMessage(session, producer); } catch (JMSException e) { e.printStackTrace(); } finally { try { if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } } private static void sendBytesMessage(Session session, MessageProducer producer) throws JMSException { BytesMessage bytesMessage = session.createBytesMessage(); bytesMessage.writeBytes("hello".getBytes()); producer.send(bytesMessage); } private static void sendMapMessage(Session session, MessageProducer producer) throws JMSException { MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("name", "张三"); producer.send(mapMessage); } private static void sendObjectMessage(Session session, MessageProducer producer) throws JMSException { ObjectMessage objectMessage = session.createObjectMessage(new Person("张三", 18)); producer.send(objectMessage); } private static void sendStreamMessage(Session session, MessageProducer producer) throws JMSException { StreamMessage message = session.createStreamMessage(); message.writeString("test.stream"); producer.send(message); } public static void sendProperty(Session session, MessageProducer producer) throws JMSException { // JMS提供了boolean、byte、double、float、int、long、object、short、string的属性设置类型 // 这边只演示string // 创建一个消息 Message message = session.createMessage(); // 设置熟悉 message.setStringProperty("name", "张三"); // 发送消息 producer.send(message); } public static void sendTextMessage(Session session, MessageProducer producer) throws JMSException { // 创建一个消息 TextMessage textMessage = session.createTextMessage(); textMessage.setText("this is test.message"); // 发送消息 producer.send(textMessage); }
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); ((ActiveMQConnectionFactory) connectionFactory).setTrustAllPackages(true); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createQueue("test.message"); // 创建一个消费者 consumer = session.createConsumer(destination); // 接收一个消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { if (message.propertyExists("name")) { System.out.println("property receive:" + message.getStringProperty("name")); } if (message instanceof TextMessage) { System.out.println("TextMessage receive:" + ((TextMessage) message).getText()); } if (message instanceof StreamMessage) { System.out.println("StreamMessage receive:" + ((StreamMessage) message).readString()); } if (message instanceof ObjectMessage) { ObjectMessage objectMessage = (ObjectMessage) message; Person person = (Person) objectMessage.getObject(); System.out.println("ObjectMessage receive:" + person); } if (message instanceof MapMessage) { System.out.println("MapMessage receive:" + ((MapMessage) message).getString("name")); } if (message instanceof BytesMessage) { byte[] b = new byte[1024]; BytesMessage bytesMessage =(BytesMessage) message; System.out.println("BytesMessage receive:" +new String(b, 0, bytesMessage.readBytes(b))); } } catch (JMSException e) { e.printStackTrace(); } } }); } catch (JMSException e) { e.printStackTrace(); } finally { } }
客户端收到消息结果如下:
property receive:张三 TextMessage receive:this is test.message StreamMessage receive:test.stream ObjectMessage receive:Person{name='张三', age=18} MapMessage receive:张三 BytesMessage receive:hello
ActiveMQ - 基本概念提供通过对消息头的消息进行过滤,使消费者对自己感兴趣的事件进行订阅,我们看看下面的例子:
生产者发送张三18岁,李四20岁,王五22岁的信息。消费者1接收名字是张三的信息,消费者2接收年龄大于18岁的信息,消费者3接收名字是李四、王五且年龄大于20岁的信息。
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageProducer producer = null; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createTopic("test.selector"); // 创建一个生产者 producer = session.createProducer(destination); // 创建消息 sendPerson1(session, producer); sendPerson2(session, producer); sendPerson3(session, producer); } catch (JMSException e) { e.printStackTrace(); } finally { try { if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } } public static void sendPerson1(Session session, MessageProducer producer) throws JMSException { TextMessage textMessage = session.createTextMessage(); textMessage.setText("张三的个人信息"); textMessage.setStringProperty("name", "张三"); textMessage.setIntProperty("age", 18); producer.send(textMessage); } public static void sendPerson2(Session session, MessageProducer producer) throws JMSException { TextMessage textMessage = session.createTextMessage(); textMessage.setText("李四的个人信息"); textMessage.setStringProperty("name", "李四"); textMessage.setIntProperty("age", 20); producer.send(textMessage); } public static void sendPerson3(Session session, MessageProducer producer) throws JMSException { TextMessage textMessage = session.createTextMessage(); textMessage.setText("王五的个人信息"); textMessage.setStringProperty("name", "王五"); textMessage.setIntProperty("age", 22); producer.send(textMessage); }
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; Message message; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createTopic("test.selector"); // 创建一个消费者 String selector = "name='张三'"; consumer = session.createConsumer(destination, selector); // 接收一个消息 while (null != (message = consumer.receive())) { System.out.println("consumer1 receive:" + ((TextMessage) message).getText()); } } catch (JMSException e) { e.printStackTrace(); } finally { try { if (consumer != null) { consumer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; Message message; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createTopic("test.selector"); // 创建一个消费者 String selector = "age>18"; consumer = session.createConsumer(destination, selector); // 接收一个消息 while (null != (message = consumer.receive())) { System.out.println("consumer2 receive:" + ((TextMessage) message).getText()); } } catch (JMSException e) { e.printStackTrace(); } finally { try { if (consumer != null) { consumer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; Message message; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createTopic("test.selector"); // 创建一个消费者 String selector = "name in ('李四','王五') and age>20"; consumer = session.createConsumer(destination, selector); // 接收一个消息 while (null != (message = consumer.receive())) { System.out.println("consumer3 receive:" + ((TextMessage) message).getText()); } } catch (JMSException e) { e.printStackTrace(); } finally { try { if (consumer != null) { consumer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
选择器有点跟sql语法一样。