多个客户端在互相发送和接收消息的时候,通常会使用以下两种方式来传递消息。第一种时消息推送,也就是由消息发送者来确保所有接受者已经成功接收到了消息,Redis内置了PUBLISH和SUBSCRIBE命令可以实现;第二种时消息拉取,这种方法要求接收者自己去获取存储在某种mailbox里的消息。尽管消息推送非常有用,但是客户端因为某些原因而没办法一直保持在线的时候,采用这一消息传递方法的程序就会出现各种各样的问题。本章节将尝试编写不同的消息拉取方式,来替代PUBLISH和SUBSCRIBE。
假设现在打算开发一个移动通信程序,这个应用通过连接服务器来发送和接受雷系短信或彩信的消息,基本上就是一个文字短信和图片彩信的替代品。每条消息都只会被发送至一个客户端,这一点极大地简化了我们要解决的问题。可以为每个移动客户端使用一个列表结构,发送者会把消息放到接受者的列表中,而接受者客户端则通过发送请求来获取最新的消息。数据格式如下:
该示例的代码实现比较简单,之前也已经学过如何对列表进行推入和弹出操作,这里就不再给出代码实现了。
单个接受者的消息传递已经满足不了需求,现在要实现一个群组聊天功能,和之前一样,因为应用程序的客户端可能会载人和时候进行连接或者断开连接,所以还是不能使用内置的PUBLISH和SUBSCRIBE。
每个新创建的群组都会有一些初始用户,每个用户都可以按照自己的意愿来参加或者离开群组。群组使用有序集合来记录参加群组的用户,其中有序集合的成员为用户的名字,分值时用户在群组内接收到的最大消息ID。用户也会使用有序集合来记录自己参加的所有群组,其中有序集合的成员为群组ID,分值是用户在群组内接收到的最大消息ID。数据格式如下:
以上数据例子表示jason和jeff都参加了001群组,其中用户jason看了6条群组消息中的5条。
publicStringcreateChat(String sender, Set<String> recipients, String message){ //通过全局计数器来获取一个新的群组ID Long chatId = stringRedisTemplate.opsForValue().increment("ids:chat:", 1l); //将发送者也添加到群组成员中 recipients.add(sender); stringRedisTemplate.execute(new SessionCallback<List<Object>>() { publicList<Object>execute(RedisOperations operations)throwsDataAccessException{ operations.multi(); recipients.forEach(recipient -> { //将用户添加到群组中,并将这些用户在群组中最大已读消息ID初始化为0 stringRedisTemplate.opsForZSet().add("chat:" + chatId, recipient, 0); //将群组ID添加到用户已参加群组的有序集合中 stringRedisTemplate.opsForZSet().add("member:" + recipient, String.valueOf(chatId),0); }); return operations.exec(); } }); //发送一条初始化消息 return ""; }
publicStringsendMessage(String chatId, String sender, String message){ // 使用锁来消除竞争条件,保证消息的读取和插入的顺序一致 String identifier = timeoutLockService.acquireLockWithTimeout("chat:" + chatId); if (identifier == null){ throw new RuntimeException("Couldn't get the lock"); } try{ //获取消息ID Long messageId = stringRedisTemplate.opsForValue().increment("ids:message:" + chatId, 1l); //将消息添加到消息有序集合中 JSONObject values = new JSONObject(); values.put("id", messageId); values.put("ts", System.currentTimeMillis()); values.put("sender", sender); values.put("message", message); stringRedisTemplate.opsForZSet().add("msgs:" + chatId, values.toJSONString(), messageId); }finally { firstLockService.releaseLock("chat:" + chatId, identifier); } return chatId; }
publicvoidfetchPendingMessages(String recipient){ // 获取组员的群组ID以及在各组中目前收到的消息的最大ID Set<ZSetOperations.TypedTuple<String>> memberSet = stringRedisTemplate.opsForZSet().rangeWithScores("member:" + recipient, 0, -1); // 获取各聊天组未读消息(分值大于上面获取的最大消息ID) List<Object> results = stringRedisTemplate.execute(new SessionCallback<List<Object>>() { publicList<Object>execute(RedisOperations operations)throwsDataAccessException{ operations.multi(); memberSet.forEach(member -> { String chatId = member.getValue(); double messageId = member.getScore(); operations.opsForZSet().rangeByScore("msgs:" + chatId, ++messageId, Double.MAX_VALUE); }); return operations.exec(); } }); //遍历未读消息 stringRedisTemplate.execute(new SessionCallback<List<Object>>() { publicList<Object>execute(RedisOperations operations)throwsDataAccessException{ operations.multi(); int i = 0; for(ZSetOperations.TypedTuple<String> member : memberSet){ Set<String> messages = (Set<String>) results.get(i++); System.out.println("聊天组:" + member.getValue() + ",有如下未读消息"); messages.forEach(message -> System.out.println(JSONObject.parseObject(message).getString("message"))); //修改群组成员读取的最大消息ID operations.opsForZSet().incrementScore( "member:" + recipient, member.getValue(), messages.size()); //修改群组有序集合中成员读取的最大消息ID operations.opsForZSet().incrementScore( "chat:" + member.getValue(), recipient, messages.size()); } return operations.exec(); } }); }