1、publish/subscribe与work queues有什么区别。
区别:
1)work queues不用定义交换机,而publish/subscribe需要定义交换机。
2)publish/subscribe的生产方是面向交换机发送消息,workqueues的生产方是面向队列发送消息(底层使用默认交换机)。
3)publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实质上work queues会将队列绑定到默认的交换机 。
相同点: 所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。
1、每个消费者监听自己的队列,并且设置routingkey。
2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。
1、生产者 声明exchange_routing_inform交换机。 声明两个队列并且绑定到此交换机,绑定时需要指定routingkey 发送消息时需要指定routingkey
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer03_routing { //队列名称 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { //创建一个与MQ的连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务 器 //创建一个连接 connection = factory.newConnection(); //创建与交换机的通道,每个通道代表一个会话 channel = connection.createChannel(); //声明交换机 String exchange, BuiltinExchangeType type /** * 参数明细 * 1、交换机名称 * 2、交换机类型,fanout、topic、direct、headers */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); //声明队列 // channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) /** * 参数明细: * 1、队列名称 * 2、是否持久化 * 3、是否独占此队列 * 4、队列不用是否自动删除 * 5、参数 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); //交换机和队列绑定String queue, String exchange, String routingKey /** * 参数明细 * 1、队列名称 * 2、交换机名称 * 3、路由key */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS); //发送邮件消息 for (int i=0;i<10;i++){ String message = "email inform to user"+i; //向交换机发送消息 String exchange, String routingKey, BasicProperties props, byte[] body /** * 参数明细 * 1、交换机名称,不指令使用默认交换机名称 Default Exchange * 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消 息将发到此队列 * 3、消息属性 * 4、消息内容 */ channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL, null, message.getBytes()); System.out.println("Send Message is:'" + message + "'"); } //发送短信消息 for (int i=0;i<10;i++){ String message = "sms inform to user"+i; //向交换机发送消息 String exchange, String routingKey, BasicProperties props, byte[] body channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null, message.getBytes()); System.out.println("Send Message is:'" + message + "'"); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally{ if(channel!=null){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if(connection!=null){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } } 复制代码
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer03_routing_email { //队列名称 private static final String QUEUE_INFORM_EMAIL = "inform_queue_email"; private static final String EXCHANGE_ROUTING_INFORM="inform_exchange_routing"; public static void main(String[] args) throws IOException, TimeoutException { //创建一个与MQ的连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器 //创建一个连接 Connection connection = factory.newConnection(); //创建与交换机的通道,每个通道代表一个会话 Channel channel = connection.createChannel(); //声明交换机 String exchange, BuiltinExchangeType type /** * 参数明细 * 1、交换机名称 * 2、交换机类型,fanout、topic、direct、headers */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); //声明队列 // channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) /** * 参数明细: * 1、队列名称 * 2、是否持久化 * 3、是否独占此队列 * 4、队列不用是否自动删除 * 5、参数 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); //交换机和队列绑定String queue, String exchange, String routingKey /** * 参数明细 * 1、队列名称 * 2、交换机名称 * 3、路由key */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL); //定义消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); String exchange = envelope.getExchange(); //消息内容 String message = new String(body, "utf‐8"); System.out.println(message); } }; /** * 监听队列String queue, boolean autoAck,Consumer callback * 参数明细 * 1、队列名称 * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 为false则需要手动回复 * 3、消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer); } } 复制代码
3、短信发送消费者 参考邮件发送消费者的代码流程,编写短信通知的代码。
打开RabbitMQ的管理界面,观察交换机绑定情况:
路由模式:
1、每个消费者监听自己的队列,并且设置带统配符的routingkey。
2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。
1、生产者 声明交换机,指定topic类型:
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer04_topics { //队列名称 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { //创建一个与MQ的连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务 器 //创建一个连接 connection = factory.newConnection(); //创建与交换机的通道,每个通道代表一个会话 channel = connection.createChannel(); //声明交换机 String exchange, BuiltinExchangeType type /** * 参数明细 * 1、交换机名称 * 2、交换机类型,fanout、topic、direct、headers */ channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); //声明队列 /** * 参数明细: * 1、队列名称 * 2、是否持久化 * 3、是否独占此队列 * 4、队列不用是否自动删除 * 5、参数 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); //发送邮件消息 for (int i=0;i<10;i++){ String message = "email inform to user"+i; //向交换机发送消息 String exchange, String routingKey, BasicProperties props, byte[] body /** * 参数明细 * 1、交换机名称,不指令使用默认交换机名称 Default Exchange * 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消 息将发到此队列 * 3、消息属性 * 4、消息内容 */ channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email", null, message.getBytes()); System.out.println("Send Message is:'" + message + "'"); } //发送短信消息 for (int i=0;i<10;i++){ String message = "sms inform to user"+i; channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms", null, message.getBytes()); System.out.println("Send Message is:'" + message + "'"); } //发送短信和邮件消息 for (int i=0;i<10;i++){ String message = "sms and email inform to user"+i; channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms.email", null, message.getBytes()); System.out.println("Send Message is:'" + message + "'"); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally{ if(channel!=null){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if(connection!=null){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } } 复制代码
队列绑定交换机指定通配符: 统配符规则: 中间以“.”分隔。 符号#可以匹配多个词,符号*可以匹配一个词语。
//声明队列 channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); //声明交换机 channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); //绑定email通知队列 channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,"inform.#.email.#"); //绑定sms通知队列 channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,"inform.#.sms.#"); 复制代码
header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配 队列。
案例: 根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种 通知类型都接收的则两种通知都有效。 代码:
1)生产者 队列与交换机绑定的代码与之前不同,如下:
Map<String, Object> headers_email = new Hashtable<String, Object>(); headers_email.put("inform_type", "email"); Map<String, Object> headers_sms = new Hashtable<String, Object>(); headers_sms.put("inform_type", "sms"); channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms); 复制代码
通知:
String message = "email inform to user"+i; Map<String,Object> headers = new Hashtable<String, Object>(); headers.put("inform_type", "email");//匹配email通知消费者绑定的header //headers.put("inform_type", "sms");//匹配sms通知消费者绑定的header AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); properties.headers(headers); //Email通知 channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes()); 复制代码
2)发送邮件消费者
channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS); Map<String, Object> headers_email = new Hashtable<String, Object>(); headers_email.put("inform_email", "email"); //交换机和队列绑定 channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); //指定消费队列 channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer); 复制代码
3)测试
1、客户端即是生产者就是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果
3、服务端将RPC方法 的结果发送到RPC响应队列
4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。