转载

Redis学习笔记十六【使用Redis构建应用程序组件-消息拉取】

多个客户端在互相发送和接收消息的时候,通常会使用以下两种方式来传递消息。第一种时消息推送,也就是由消息发送者来确保所有接受者已经成功接收到了消息,Redis内置了PUBLISH和SUBSCRIBE命令可以实现;第二种时消息拉取,这种方法要求接收者自己去获取存储在某种mailbox里的消息。尽管消息推送非常有用,但是客户端因为某些原因而没办法一直保持在线的时候,采用这一消息传递方法的程序就会出现各种各样的问题。本章节将尝试编写不同的消息拉取方式,来替代PUBLISH和SUBSCRIBE。

单接收者消息的发送与订阅替代品

假设现在打算开发一个移动通信程序,这个应用通过连接服务器来发送和接受雷系短信或彩信的消息,基本上就是一个文字短信和图片彩信的替代品。每条消息都只会被发送至一个客户端,这一点极大地简化了我们要解决的问题。可以为每个移动客户端使用一个列表结构,发送者会把消息放到接受者的列表中,而接受者客户端则通过发送请求来获取最新的消息。数据格式如下:

  • key
    • mailbox:bboyjing
  • list value
    • “{“sendder”:”dora”,”msg”:”Hi I’m dora.”}”
    • ××××××

该示例的代码实现比较简单,之前也已经学过如何对列表进行推入和弹出操作,这里就不再给出代码实现了。

多接受者消息的发送与订阅替代品

单个接受者的消息传递已经满足不了需求,现在要实现一个群组聊天功能,和之前一样,因为应用程序的客户端可能会载人和时候进行连接或者断开连接,所以还是不能使用内置的PUBLISH和SUBSCRIBE。

每个新创建的群组都会有一些初始用户,每个用户都可以按照自己的意愿来参加或者离开群组。群组使用有序集合来记录参加群组的用户,其中有序集合的成员为用户的名字,分值时用户在群组内接收到的最大消息ID。用户也会使用有序集合来记录自己参加的所有群组,其中有序集合的成员为群组ID,分值是用户在群组内接收到的最大消息ID。数据格式如下:

  • key
    • chat:001
  • zset value
    • member : john | score : 5
    • member : jeff | score : 6
  • key
    • chat:002
  • zset value
    • member : michelle | score : 10
    • member : jason | score : 10
    • member : jenny | score : 11
  • key
    • member:jason
  • zset value
    • member : 001 | score : 5
    • member : 002 | score : 6
  • key
    • member:jeff
  • zset value
    • member : 001 | score : 6

以上数据例子表示jason和jeff都参加了001群组,其中用户jason看了6条群组消息中的5条。

创建群组聊天会话

publicStringcreateChat(String sender, Set<String> recipients, String message){
    //通过全局计数器来获取一个新的群组ID
    Long chatId = stringRedisTemplate.opsForValue().increment("ids:chat:", 1l);

    //将发送者也添加到群组成员中
    recipients.add(sender);

    stringRedisTemplate.execute(new SessionCallback<List<Object>>() {
        publicList<Object>execute(RedisOperations operations)throwsDataAccessException{
            operations.multi();
            recipients.forEach(recipient -> {
                //将用户添加到群组中,并将这些用户在群组中最大已读消息ID初始化为0
                stringRedisTemplate.opsForZSet().add("chat:" + chatId, recipient, 0);
                //将群组ID添加到用户已参加群组的有序集合中
                stringRedisTemplate.opsForZSet().add("member:" + recipient, String.valueOf(chatId),0);
            });
            return operations.exec();
        }
    });

    //发送一条初始化消息
    return "";
}

发送消息

publicStringsendMessage(String chatId, String sender, String message){
    // 使用锁来消除竞争条件,保证消息的读取和插入的顺序一致
    String identifier = timeoutLockService.acquireLockWithTimeout("chat:" + chatId);
    if (identifier == null){
        throw new RuntimeException("Couldn't get the lock");
    }
    try{
        //获取消息ID
        Long messageId = stringRedisTemplate.opsForValue().increment("ids:message:" + chatId, 1l);
        //将消息添加到消息有序集合中
        JSONObject values = new JSONObject();
        values.put("id", messageId);
        values.put("ts", System.currentTimeMillis());
        values.put("sender", sender);
        values.put("message", message);
        stringRedisTemplate.opsForZSet().add("msgs:" + chatId, values.toJSONString(), messageId);
    }finally {
        firstLockService.releaseLock("chat:" + chatId, identifier);
    }

    return chatId;
}

获取消息

publicvoidfetchPendingMessages(String recipient){
    // 获取组员的群组ID以及在各组中目前收到的消息的最大ID
    Set<ZSetOperations.TypedTuple<String>> memberSet =
            stringRedisTemplate.opsForZSet().rangeWithScores("member:" + recipient, 0, -1);

    // 获取各聊天组未读消息(分值大于上面获取的最大消息ID)
    List<Object> results = stringRedisTemplate.execute(new SessionCallback<List<Object>>() {
        publicList<Object>execute(RedisOperations operations)throwsDataAccessException{
            operations.multi();
            memberSet.forEach(member -> {
                String chatId = member.getValue();
                double messageId = member.getScore();
                operations.opsForZSet().rangeByScore("msgs:" + chatId, ++messageId, Double.MAX_VALUE);
            });
            return operations.exec();
        }
    });

    //遍历未读消息
    stringRedisTemplate.execute(new SessionCallback<List<Object>>() {
        publicList<Object>execute(RedisOperations operations)throwsDataAccessException{
            operations.multi();
            int i = 0;
            for(ZSetOperations.TypedTuple<String> member : memberSet){
                Set<String> messages = (Set<String>) results.get(i++);
                System.out.println("聊天组:" + member.getValue() + ",有如下未读消息");
                messages.forEach(message ->
                        System.out.println(JSONObject.parseObject(message).getString("message")));
                //修改群组成员读取的最大消息ID
                operations.opsForZSet().incrementScore(
                        "member:" + recipient,
                        member.getValue(),
                        messages.size());
                //修改群组有序集合中成员读取的最大消息ID
                operations.opsForZSet().incrementScore(
                        "chat:" + member.getValue(),
                        recipient,
                        messages.size());
            }
            return operations.exec();
        }
    });
}
原文  http://bboyjing.github.io/2016/12/28/Redis学习笔记十六【使用Redis构建应用程序组件-消息拉取】/
正文到此结束
Loading...