转载

初始Java与RabbitMQ(三)

1、publish/subscribe与work queues有什么区别。

区别:

1)work queues不用定义交换机,而publish/subscribe需要定义交换机。

2)publish/subscribe的生产方是面向交换机发送消息,workqueues的生产方是面向队列发送消息(底层使用默认交换机)。

3)publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实质上work queues会将队列绑定到默认的交换机 。

相同点: 所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。

Routing

工作模式

初始Java与RabbitMQ(三)

路由模式:

1、每个消费者监听自己的队列,并且设置routingkey。

2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。

代码

1、生产者 声明exchange_routing_inform交换机。 声明两个队列并且绑定到此交换机,绑定时需要指定routingkey 发送消息时需要指定routingkey

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer03_routing {
//队列名称
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//创建一个与MQ的连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务
器
//创建一个连接
connection = factory.newConnection();
//创建与交换机的通道,每个通道代表一个会话
channel = connection.createChannel();
//声明交换机 String exchange, BuiltinExchangeType type
/**
* 参数明细
* 1、交换机名称
* 2、交换机类型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//声明队列
// channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean
autoDelete, Map<String, Object> arguments)
/**
* 参数明细:
* 1、队列名称
* 2、是否持久化
* 3、是否独占此队列
* 4、队列不用是否自动删除
* 5、参数
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
//交换机和队列绑定String queue, String exchange, String routingKey
/**
* 参数明细
* 1、队列名称
* 2、交换机名称
* 3、路由key
*/
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL);
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS);
//发送邮件消息
for (int i=0;i<10;i++){
String message = "email inform to user"+i;
//向交换机发送消息 String exchange, String routingKey, BasicProperties props,
byte[] body
/**
* 参数明细
* 1、交换机名称,不指令使用默认交换机名称 Default Exchange
* 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消
息将发到此队列
* 3、消息属性
* 4、消息内容
*/
channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL, null,
message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
//发送短信消息
for (int i=0;i<10;i++){
String message = "sms inform to user"+i;
//向交换机发送消息 String exchange, String routingKey, BasicProperties props,
byte[] body
channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null,
message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally{
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
复制代码

邮件发送消费者

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer03_routing_email {
//队列名称
private static final String QUEUE_INFORM_EMAIL = "inform_queue_email";
private static final String EXCHANGE_ROUTING_INFORM="inform_exchange_routing";
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个与MQ的连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
//创建一个连接
Connection connection = factory.newConnection();
//创建与交换机的通道,每个通道代表一个会话
Channel channel = connection.createChannel();
//声明交换机 String exchange, BuiltinExchangeType type
/**
* 参数明细
* 1、交换机名称
* 2、交换机类型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//声明队列
// channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean
autoDelete, Map<String, Object> arguments)
/**
* 参数明细:
* 1、队列名称
* 2、是否持久化
* 3、是否独占此队列
* 4、队列不用是否自动删除
* 5、参数
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
//交换机和队列绑定String queue, String exchange, String routingKey
/**
* 参数明细
* 1、队列名称
* 2、交换机名称
* 3、路由key
*/
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL);
//定义消费方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String exchange = envelope.getExchange();
//消息内容
String message = new String(body, "utf‐8");
System.out.println(message);
}
};
/**
* 监听队列String queue, boolean autoAck,Consumer callback
* 参数明细
* 1、队列名称
* 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
为false则需要手动回复
* 3、消费消息的方法,消费者接收到消息后调用此方法
*/
channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);
}
}
复制代码

3、短信发送消费者 参考邮件发送消费者的代码流程,编写短信通知的代码。

测试

打开RabbitMQ的管理界面,观察交换机绑定情况:

初始Java与RabbitMQ(三)

Topics

初始Java与RabbitMQ(三)

路由模式:

1、每个消费者监听自己的队列,并且设置带统配符的routingkey。

2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。

代码

1、生产者 声明交换机,指定topic类型:

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer04_topics {
//队列名称
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//创建一个与MQ的连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务
器
//创建一个连接
connection = factory.newConnection();
//创建与交换机的通道,每个通道代表一个会话
channel = connection.createChannel();
//声明交换机 String exchange, BuiltinExchangeType type
/**
* 参数明细
* 1、交换机名称
* 2、交换机类型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
//声明队列
/**
* 参数明细:
* 1、队列名称
* 2、是否持久化
* 3、是否独占此队列
* 4、队列不用是否自动删除
* 5、参数
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
//发送邮件消息
for (int i=0;i<10;i++){
String message = "email inform to user"+i;
//向交换机发送消息 String exchange, String routingKey, BasicProperties props,
byte[] body
/**
* 参数明细
* 1、交换机名称,不指令使用默认交换机名称 Default Exchange
* 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消
息将发到此队列
* 3、消息属性
* 4、消息内容
*/
channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email", null,
message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
//发送短信消息
for (int i=0;i<10;i++){
String message = "sms inform to user"+i;
channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms", null,
message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
//发送短信和邮件消息
for (int i=0;i<10;i++){
String message = "sms and email inform to user"+i;
channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms.email", null,
message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally{
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
复制代码

消费端

队列绑定交换机指定通配符: 统配符规则: 中间以“.”分隔。 符号#可以匹配多个词,符号*可以匹配一个词语。

//声明队列
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
//声明交换机
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
//绑定email通知队列
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,"inform.#.email.#");
//绑定sms通知队列
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,"inform.#.sms.#");
复制代码

测试

初始Java与RabbitMQ(三)

Header模式

header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配 队列。

案例: 根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种 通知类型都接收的则两种通知都有效。 代码:

1)生产者 队列与交换机绑定的代码与之前不同,如下:

Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_type", "email");
Map<String, Object> headers_sms = new Hashtable<String, Object>();
headers_sms.put("inform_type", "sms");
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);
复制代码

通知:

String message = "email inform to user"+i;
Map<String,Object> headers = new Hashtable<String, Object>();
headers.put("inform_type", "email");//匹配email通知消费者绑定的header
//headers.put("inform_type", "sms");//匹配sms通知消费者绑定的header
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
properties.headers(headers);
//Email通知
channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());
复制代码

2)发送邮件消费者

channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS);
Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_email", "email");
//交换机和队列绑定
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
//指定消费队列
channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);
复制代码

3)测试

初始Java与RabbitMQ(三)

RPC

初始Java与RabbitMQ(三)

RPC即客户端远程调用服务端的方法,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:

1、客户端即是生产者就是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。

2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果

3、服务端将RPC方法 的结果发送到RPC响应队列

4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

  • GitHub代码地址
  • 消费者
  • github.com/flagest/onl…
  • 生产者
  • github.com/flagest/onl…
原文  https://juejin.im/post/5e6f8ae2518825495e1066d3
正文到此结束
Loading...