转载

ActiveMQ - 通配符

之前消息选择器的例子,消费者通过消息选择器来获取自己想要的消息,我们也可以通过另外通配符的方式,来获取自己想要的消息,当然两种还是有区别的。

通配符的方式有三种:

  • ".":用于分隔路径名字
  • "*":用于匹配路径的任何名字
  • ">":用于匹配末尾的名称

示例

消费者1消费a.b.c的消息,消费者2消费a.*.c的消息,所以中间是什么他不在乎,消费者3消费a.开头的消息,所以a.后面是什么他不在乎。生产者往a.b.c和a.b.c发送数据。

生产者代码

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection = null;
    Session session = null;
    boolean useTransaction = false;
    try {
        // 创建一个ConnectionFactory
        connectionFactory = new ActiveMQConnectionFactory();
        // 创建一个Connection
        connection = connectionFactory.createConnection();
        // 启动消息传递的连接
        connection.start();
        // 创建一个session
        session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
        // 创建消息
        sendMsg1(session);
        sendMsg2(session);
    } catch (JMSException e) {
        e.printStackTrace();
    } finally {
        try {
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

public static void sendMsg1(Session session) throws JMSException {
    Destination destination = session.createTopic("a.b.c");
    MessageProducer producer = session.createProducer(destination);
    producer.send(session.createTextMessage("this is a.b.c"));
    producer.close();
}

public static void sendMsg2(Session session) throws JMSException {
    Destination destination = session.createTopic("a.bb.c");
    MessageProducer producer = session.createProducer(destination);
    producer.send(session.createTextMessage("this is a.bb.c"));
    producer.close();
}

消费者1

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection = null;
    Session session = null;
    Destination destination;
    MessageConsumer consumer = null;
    Message message;
    boolean useTransaction = false;
    try {
        // 创建一个ConnectionFactory
        connectionFactory = new ActiveMQConnectionFactory();
        // 创建一个Connection
        connection = connectionFactory.createConnection();
        // 启动消息传递的连接
        connection.start();
        // 创建一个session
        session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
        // 创建一个destination,把消息发送到test.queue
        destination = session.createTopic("a.b.c");
        // 创建一个消费者
        consumer = session.createConsumer(destination);
        // 接收一个消息
        while (null != (message = consumer.receive())) {
            System.out.println("consumer1 receive:" + ((TextMessage) message).getText());
        }
    } catch (JMSException e) {
        e.printStackTrace();
    } finally {
        try {
            if (consumer != null) {
                consumer.close();
            }
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

消费者2

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection = null;
    Session session = null;
    Destination destination;
    MessageConsumer consumer = null;
    Message message;
    boolean useTransaction = false;
    try {
        // 创建一个ConnectionFactory
        connectionFactory = new ActiveMQConnectionFactory();
        // 创建一个Connection
        connection = connectionFactory.createConnection();
        // 启动消息传递的连接
        connection.start();
        // 创建一个session
        session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
        // 创建一个destination,把消息发送到test.queue
        destination = session.createTopic("a.*.c");
        consumer = session.createConsumer(destination);
        // 接收一个消息
        while (null != (message = consumer.receive())) {
            System.out.println("consumer2 receive:" + ((TextMessage) message).getText());
        }
    } catch (JMSException e) {
        e.printStackTrace();
    } finally {
        try {
            if (consumer != null) {
                consumer.close();
            }
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

消费者3

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection = null;
    Session session = null;
    Destination destination;
    MessageConsumer consumer = null;
    Message message;
    boolean useTransaction = false;
    try {
        // 创建一个ConnectionFactory
        connectionFactory = new ActiveMQConnectionFactory();
        // 创建一个Connection
        connection = connectionFactory.createConnection();
        // 启动消息传递的连接
        connection.start();
        // 创建一个session
        session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
        // 创建一个destination,把消息发送到test.queue
        destination = session.createTopic("a.>");
        consumer = session.createConsumer(destination);
        // 接收一个消息
        while (null != (message = consumer.receive())) {
            System.out.println("consumer3 receive:" + ((TextMessage) message).getText());
        }
    } catch (JMSException e) {
        e.printStackTrace();
    } finally {
        try {
            if (consumer != null) {
                consumer.close();
            }
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

运行结果如下:

消费者1只获取1个消息

ActiveMQ - 通配符

消费者2获取2个消息

ActiveMQ - 通配符

消费者3也获取2个消息

ActiveMQ - 通配符

原文  https://segmentfault.com/a/1190000022578497
正文到此结束
Loading...