转载

ActiveMQ - 消息

消息类型及属性

ActiveMQ - 基本概念提到了消息的几种类型,包括TextMessage、StreamMessage、ObjectMessage、MapMessage、BytesMessage、BlobMessage。属性包括boolean、byte、double、float、int、long、object、short、string。下面看看在代码中是怎么发送和接收的。

生产者

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection = null;
    Session session = null;
    MessageProducer producer = null;
    boolean useTransaction = false;
    try {
        // 创建一个ConnectionFactory
        connectionFactory = new ActiveMQConnectionFactory();
        // 创建一个Connection
        connection = connectionFactory.createConnection();
        // 启动消息传递的连接
        connection.start();
        // 创建一个session
        session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
        // 创建一个destination,把消息发送到test.queue
        Destination destination = session.createQueue("test.message");
        // 创建一个生产者
        producer = session.createProducer(destination);
        // 属性设置
        sendProperty(session, producer);
        // 各种消息发送
        sendTextMessage(session, producer);
        sendStreamMessage(session, producer);
        sendObjectMessage(session, producer);
        sendMapMessage(session, producer);
        sendBytesMessage(session, producer);
    } catch (JMSException e) {
        e.printStackTrace();
    } finally {
        try {
            if (producer != null) {
                producer.close();
            }
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

private static void sendBytesMessage(Session session, MessageProducer producer) throws JMSException {
    BytesMessage bytesMessage = session.createBytesMessage();
    bytesMessage.writeBytes("hello".getBytes());
    producer.send(bytesMessage);
}

private static void sendMapMessage(Session session, MessageProducer producer) throws JMSException {
    MapMessage mapMessage = session.createMapMessage();
    mapMessage.setString("name", "张三");
    producer.send(mapMessage);
}

private static void sendObjectMessage(Session session, MessageProducer producer) throws JMSException {
    ObjectMessage objectMessage = session.createObjectMessage(new Person("张三", 18));
    producer.send(objectMessage);
}

private static void sendStreamMessage(Session session, MessageProducer producer) throws JMSException {
    StreamMessage message = session.createStreamMessage();
    message.writeString("test.stream");
    producer.send(message);
}

public static void sendProperty(Session session, MessageProducer producer) throws JMSException {
    // JMS提供了boolean、byte、double、float、int、long、object、short、string的属性设置类型
    // 这边只演示string
    // 创建一个消息
    Message message = session.createMessage();
    // 设置熟悉
    message.setStringProperty("name", "张三");
    // 发送消息
    producer.send(message);
}

public static void sendTextMessage(Session session, MessageProducer producer) throws JMSException {
    // 创建一个消息
    TextMessage textMessage = session.createTextMessage();
    textMessage.setText("this is test.message");
    // 发送消息
    producer.send(textMessage);
}

消费者

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection = null;
    Session session = null;
    Destination destination;
    MessageConsumer consumer = null;
    boolean useTransaction = false;
    try {
        // 创建一个ConnectionFactory
        connectionFactory = new ActiveMQConnectionFactory();
        ((ActiveMQConnectionFactory) connectionFactory).setTrustAllPackages(true);
        // 创建一个Connection
        connection = connectionFactory.createConnection();
        // 启动消息传递的连接
        connection.start();
        // 创建一个session
        session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
        // 创建一个destination,把消息发送到test.queue
        destination = session.createQueue("test.message");
        // 创建一个消费者
        consumer = session.createConsumer(destination);
        // 接收一个消息
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    if (message.propertyExists("name")) {
                        System.out.println("property receive:" + message.getStringProperty("name"));
                    }
                    if (message instanceof TextMessage) {
                        System.out.println("TextMessage receive:" + ((TextMessage) message).getText());
                    }
                    if (message instanceof StreamMessage) {
                        System.out.println("StreamMessage receive:" + ((StreamMessage) message).readString());
                    }
                    if (message instanceof ObjectMessage) {
                        ObjectMessage objectMessage = (ObjectMessage) message;
                        Person person = (Person) objectMessage.getObject();
                        System.out.println("ObjectMessage receive:" + person);
                    }
                    if (message instanceof MapMessage) {
                        System.out.println("MapMessage receive:" + ((MapMessage) message).getString("name"));
                    }
                    if (message instanceof BytesMessage) {
                        byte[] b = new byte[1024];
                        BytesMessage bytesMessage =(BytesMessage) message;
                        System.out.println("BytesMessage receive:" +new String(b, 0, bytesMessage.readBytes(b)));
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    } catch (JMSException e) {
        e.printStackTrace();
    } finally {

    }
}

客户端收到消息结果如下:

property receive:张三
TextMessage receive:this is test.message
StreamMessage receive:test.stream
ObjectMessage receive:Person{name='张三', age=18}
MapMessage receive:张三
BytesMessage receive:hello

消息选择器

ActiveMQ - 基本概念提供通过对消息头的消息进行过滤,使消费者对自己感兴趣的事件进行订阅,我们看看下面的例子:

生产者发送张三18岁,李四20岁,王五22岁的信息。消费者1接收名字是张三的信息,消费者2接收年龄大于18岁的信息,消费者3接收名字是李四、王五且年龄大于20岁的信息。

生产者

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection = null;
    Session session = null;
    Destination destination;
    MessageProducer producer = null;
    boolean useTransaction = false;
    try {
        // 创建一个ConnectionFactory
        connectionFactory = new ActiveMQConnectionFactory();
        // 创建一个Connection
        connection = connectionFactory.createConnection();
        // 启动消息传递的连接
        connection.start();
        // 创建一个session
        session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
        // 创建一个destination,把消息发送到test.queue
        destination = session.createTopic("test.selector");
        // 创建一个生产者
        producer = session.createProducer(destination);
        // 创建消息
        sendPerson1(session, producer);
        sendPerson2(session, producer);
        sendPerson3(session, producer);
    } catch (JMSException e) {
        e.printStackTrace();
    } finally {
        try {
            if (producer != null) {
                producer.close();
            }
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

public static void sendPerson1(Session session, MessageProducer producer) throws JMSException {
    TextMessage textMessage = session.createTextMessage();
    textMessage.setText("张三的个人信息");
    textMessage.setStringProperty("name", "张三");
    textMessage.setIntProperty("age", 18);
    producer.send(textMessage);
}

public static void sendPerson2(Session session, MessageProducer producer) throws JMSException {
    TextMessage textMessage = session.createTextMessage();
    textMessage.setText("李四的个人信息");
    textMessage.setStringProperty("name", "李四");
    textMessage.setIntProperty("age", 20);
    producer.send(textMessage);
}

public static void sendPerson3(Session session, MessageProducer producer) throws JMSException {
    TextMessage textMessage = session.createTextMessage();
    textMessage.setText("王五的个人信息");
    textMessage.setStringProperty("name", "王五");
    textMessage.setIntProperty("age", 22);
    producer.send(textMessage);
}

消费者1

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection = null;
    Session session = null;
    Destination destination;
    MessageConsumer consumer = null;
    Message message;
    boolean useTransaction = false;
    try {
        // 创建一个ConnectionFactory
        connectionFactory = new ActiveMQConnectionFactory();
        // 创建一个Connection
        connection = connectionFactory.createConnection();
        // 启动消息传递的连接
        connection.start();
        // 创建一个session
        session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
        // 创建一个destination,把消息发送到test.queue
        destination = session.createTopic("test.selector");
        // 创建一个消费者
        String selector = "name='张三'";
        consumer = session.createConsumer(destination, selector);
        // 接收一个消息
        while (null != (message = consumer.receive())) {
            System.out.println("consumer1 receive:" + ((TextMessage) message).getText());
        }
    } catch (JMSException e) {
        e.printStackTrace();
    } finally {
        try {
            if (consumer != null) {
                consumer.close();
            }
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

消费者2

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection = null;
    Session session = null;
    Destination destination;
    MessageConsumer consumer = null;
    Message message;
    boolean useTransaction = false;
    try {
        // 创建一个ConnectionFactory
        connectionFactory = new ActiveMQConnectionFactory();
        // 创建一个Connection
        connection = connectionFactory.createConnection();
        // 启动消息传递的连接
        connection.start();
        // 创建一个session
        session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
        // 创建一个destination,把消息发送到test.queue
        destination = session.createTopic("test.selector");
        // 创建一个消费者
        String selector = "age>18";
        consumer = session.createConsumer(destination, selector);
        // 接收一个消息
        while (null != (message = consumer.receive())) {
            System.out.println("consumer2 receive:" + ((TextMessage) message).getText());
        }
    } catch (JMSException e) {
        e.printStackTrace();
    } finally {
        try {
            if (consumer != null) {
                consumer.close();
            }
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

消费者3

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection = null;
    Session session = null;
    Destination destination;
    MessageConsumer consumer = null;
    Message message;
    boolean useTransaction = false;
    try {
        // 创建一个ConnectionFactory
        connectionFactory = new ActiveMQConnectionFactory();
        // 创建一个Connection
        connection = connectionFactory.createConnection();
        // 启动消息传递的连接
        connection.start();
        // 创建一个session
        session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
        // 创建一个destination,把消息发送到test.queue
        destination = session.createTopic("test.selector");
        // 创建一个消费者
        String selector = "name in ('李四','王五') and age>20";
        consumer = session.createConsumer(destination, selector);
        // 接收一个消息
        while (null != (message = consumer.receive())) {
            System.out.println("consumer3 receive:" + ((TextMessage) message).getText());
        }
    } catch (JMSException e) {
        e.printStackTrace();
    } finally {
        try {
            if (consumer != null) {
                consumer.close();
            }
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

选择器有点跟sql语法一样。

原文  https://segmentfault.com/a/1190000022383328
正文到此结束
Loading...