转载

Redis-消息队列

在网站开发中,当页面需要进行如发送邮件、发送短信、复杂数据运算等耗时较长的操作时会阻塞页面的渲染。为了避免用户等待太久,应该使用独立的线程来完成这类操作。不过一些编程语言或框架不易实现多线程,这时很容易就会想到通过其他进程来实现。设想有一个进程能够完成发邮件的功能,那么在页面中只需要想办法通知这个进程向指定的地址发送邮件就可以了。

更多的时候,服务器做的额外的事情,并不需要客户端等待,这时候就可以把这些额外的事情异步去做。处理异步任务的工具很多。主要还是处理通知消息,针对通知消息通常采用的是队列结构。这个异步处理的模型可以抽象为生产者和消费者模型。

redis中提供了两种方式来做消息队列,一个是使用生产者消费者模式,一种是使用发布订阅者模式。生产者消费者模式会让一个或者多个客户端监听消息队列,一旦有队列中有消息,消费者会马上去消费,谁先获得这个消息,谁就去处理。如果队列为空,则消费者继续监听。发布订阅者模式也是使用了一个或者多个客户端订阅消息频道,只要发布者发布了消息,所有的订阅者都能收到消息。

这里我们先看下生产者和消费者模型的消息队列。

队列

与任务队列进行交互的实体有两类:

  • 生产者(producer):生产者会将需要处理的任务放入任务队列中
  • 消费者(consumer):消费者不断从任务队列中读入任务信息并执行

使用任务队列的好处

  1. 松耦合
    生产者和消费者无需知道彼此的实现细节,只需要约定好任务的描述格式。这使得生产者和消费者可以由不同的团队使用不同的编程语言来进行开发。
  2. 易扩展
    可以很方便的将消费者阔这到很多个,而且可以分布在不同的服务器中。

普通队列

在redis中,可以使用列表(list)类型来实现。使用LPUSH和RPOP命令,

生产者将任务使用LPUSH命令加入到某个键中,消费者不断的使用RPOP命令从该键中取出任务。

代码如下:

while (true) {
String task = jedis.rpop("queue.task");
if(task!=null && task.length()>0){
execute(task);
}else{
Thread.sleep(1000);
}
}

上面是一个很简单的任务队列的程序,不过上面的代码还是有点不太完美的地方:当任务队列中没有任务的时候,消费者每秒都会调用一次RPOP命令查看是否有新的任务。这里我们可以借助BRPOP命令来实现一旦有新的任务队列就通知消费者,这样消费者每秒都去查看任务队列中是否有任务了。

上面的程序可以修改为:

while (true) {
List<String> tasks = jedis.brpop(10, "queue.task");
if (tasks != null && tasks.size() > 0) {
execute(tasks.get(1)); // 这里要获取index为1的数据,因为index为0的数据是该队列的名字
}
}

BRPOP 命令和 RPOP 命令相似,唯一的区别就是当列表中没有元数据的时候,BRPOP命令会一直阻塞住连接,直到有新元素加入。

BRPOP命令接收两个参数,第一个是键名,第二个是超时时间(单位:s)。当超过了这个时间,任然没有获得到新的数据的话,就会返回nil。当设置超时时间为0的时候,表示不限制等待时间,也就是说没有新数据加入的时候,就会永远阻塞下去。

redis还提供了BLPOP命令,和BRPOP的区别在与从队列取数据时,BLPOP会从队列左边开始取数据,而BRPOP是从队列的右边开始取数据。

优先级队列

在产品给用户发送短信的时候,有验证短信和营销短信,同样都是将短信添加到任务队列,供消费者进行处理。在没有优先级队列的时候,系统现在正在给用户发送营销短信,这是有一个用户注册了,系统要给用户发送验证短信,但是系统正在处理营销短信,按照普通任务队列,验证短信,要在营销短信发送完成之后,才开始发送,这样多影响用户体验。要是不管在什么时候,只要有发送验证短信的任务,就能立刻去处理掉,这样就好啦。

这样我们就需要一个具有优先级的任务队列啦,按照我们的业务逻辑,优先处理重要的任务。

在redis中, BRPOP 命令可以同时接收多个键,完整的命令格式如下

BRPOP queue.task.1 queue.task.2 0

它可以同时检测多个键,如果所有的键都没有数据则阻塞,如果其中有一个键有数据则会返回响应的数据。如果多个键都有数据,则按照命令中从左到右的键的顺序取第一个有数据的键中对应的数据。这样我们就可以实现一个具有优先级的任务队列了。

代码如下:

while (true) {
List<String> tasks = jedis.brpop(10, "queue.task.1", "queue.task.2");
if (tasks != null && tasks.size() > 0) {
execute(tasks.get(1));
}
}
原文  http://dxer.github.io/2016/03/20/redis-消息队列/
正文到此结束
Loading...