前面的两篇文章( Redis的持久化方案 , 一文掌握Redis的三种集群方案 )分别介绍了Redis的持久化与集群方案 —— 包括主从复制模式、哨兵模式、Cluster模式,其中主从复制模式由于不能自动做故障转移,当节点出现故障时需要人为干预,不满足生产环境的高可用需求,所以在生产环境一般使用哨兵模式或Cluster模式。那么在Spring Boot项目中,如何访问这两种模式的Redis集群,可能遇到哪些问题,是本文即将介绍的内容。
<!--more-->
spring boot中整合Redis非常简单,在pom.xml中添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
spring boot 2的 spring-boot-starter-data-redis
中,默认使用的是lettuce作为redis客户端,它与jedis的主要区别如下:
如果不使用默认的lettuce,使用jedis的话,可以排除lettuce的依赖,手动加入jedis依赖,配置如下
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <exclusions> <exclusion> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency>
在配置文件application.yml中添加配置(针对单实例)
spring: redis: host: 192.168.40.201 port: 6379 password: passw0rd database: 0 # 数据库索引,默认0 timeout: 5000 # 连接超时,单位ms jedis: # 或lettuce, 连接池配置,springboot2.0中使用jedis或者lettuce配置连接池,默认为lettuce连接池 pool: max-active: 8 # 连接池最大连接数(使用负值表示没有限制) max-wait: -1 # 连接池分配连接最大阻塞等待时间(阻塞时间到,抛出异常。使用负值表示无限期阻塞) max-idle: 8 # 连接池中的最大空闲连接数 min-idle: 0 # 连接池中的最小空闲连接数
然后添加配置类。其中@EnableCaching注解是为了使@Cacheable、@CacheEvict、@CachePut、@Caching注解生效
@Configuration @EnableCaching public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(factory); // 使用Jackson2JsonRedisSerialize 替换默认的jdkSerializeable序列化 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); // key采用String的序列化方式 template.setKeySerializer(stringRedisSerializer); // hash的key也采用String的序列化方式 template.setHashKeySerializer(stringRedisSerializer); // value序列化方式采用jackson template.setValueSerializer(jackson2JsonRedisSerializer); // hash的value序列化方式采用jackson template.setHashValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } }
上述配置类注入了自定义的RedisTemplate<String, Object>, 替换RedisAutoConfiguration中自动配置的RedisTemplate<Object, Object>类(RedisAutoConfiguration另外还自动配置了StringRedisTemplate)。
此时,我们可以通过定义一个基于RedisTemplate的工具类,或通过在Service层添加@Cacheable、@CacheEvict、@CachePut、@Caching注解来使用缓存。比如定义一个RedisService类,封装常用的Redis操作方法,
@Component @Slf4j public class RedisService { @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 指定缓存失效时间 * * @param key 键 * @param time 时间(秒) * @return */ public boolean expire(String key, long time) { try { if (time > 0) { redisTemplate.expire(key, time, TimeUnit.SECONDS); } return true; } catch (Exception e) { log.error("exception when expire key {}. ", key, e); return false; } } /** * 根据key获取过期时间 * * @param key 键 不能为null * @return 时间(秒) 返回0代表为永久有效 */ public long getExpire(String key) { return redisTemplate.getExpire(key, TimeUnit.SECONDS); } /** * 判断key是否存在 * * @param key 键 * @return true 存在 false不存在 */ public boolean hasKey(String key) { try { return redisTemplate.hasKey(key); } catch (Exception e) { log.error("exception when check key {}. ", key, e); return false; } } ... }
出于篇幅,完整代码请查阅本文示例源码: https://github.com/ronwxy/spr...
或在Service层使用注解,如
@Service @CacheConfig(cacheNames = "users") public class UserService { private static Map<String, User> userMap = new HashMap<>(); @CachePut(key = "#user.username") public User addUser(User user){ user.setUid(UUID.randomUUID().toString()); System.out.println("add user: " + user); userMap.put(user.getUsername(), user); return user; } @Caching(put = { @CachePut( key = "#user.username"), @CachePut( key = "#user.uid") }) public User addUser2(User user) { user.setUid(UUID.randomUUID().toString()); System.out.println("add user2: " + user); userMap.put(user.getUsername(), user); return user; } ... }
Spring Boot 2 整合Redis哨兵模式除了配置稍有差异,其它与整合单实例模式类似,配置示例为
spring: redis: password: passw0rd timeout: 5000 sentinel: master: mymaster nodes: 192.168.40.201:26379,192.168.40.201:36379,192.168.40.201:46379 # 哨兵的IP:Port列表 jedis: # 或lettuce pool: max-active: 8 max-wait: -1 max-idle: 8 min-idle: 0
完整示例可查阅源码: https://github.com/ronwxy/spr...
上述配置只指定了哨兵节点的地址与master的名称,但Redis客户端最终访问操作的是master节点,那么Redis客户端是如何获取master节点的地址,并在发生故障转移时,如何自动切换master地址的呢?我们以Jedis连接池为例,通过源码来揭开其内部实现的神秘面纱。
在 JedisSentinelPool 类的构造函数中,对连接池做了初始化,如下
public JedisSentinelPool(String masterName, Set<String> sentinels, final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, final String password, final int database, final String clientName) { this.poolConfig = poolConfig; this.connectionTimeout = connectionTimeout; this.soTimeout = soTimeout; this.password = password; this.database = database; this.clientName = clientName; HostAndPort master = initSentinels(sentinels, masterName); initPool(master); } private HostAndPort initSentinels(Set<String> sentinels, final String masterName) { for (String sentinel : sentinels) { final HostAndPort hap = HostAndPort.parseString(sentinel); log.fine("Connecting to Sentinel " + hap); Jedis jedis = null; try { jedis = new Jedis(hap.getHost(), hap.getPort()); List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName); // connected to sentinel... sentinelAvailable = true; if (masterAddr == null || masterAddr.size() != 2) { log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap + "."); continue; } master = toHostAndPort(masterAddr); log.fine("Found Redis master at " + master); break; } catch (JedisException e) { // resolves #1036, it should handle JedisException there's another chance // of raising JedisDataException log.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e + ". Trying next one."); } finally { if (jedis != null) { jedis.close(); } } } //省略了非关键代码 for (String sentinel : sentinels) { final HostAndPort hap = HostAndPort.parseString(sentinel); MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort()); // whether MasterListener threads are alive or not, process can be stopped masterListener.setDaemon(true); masterListeners.add(masterListener); masterListener.start(); } return master; }
initSentinels
方法中主要干了两件事:
get-master-addr-by-name
命令获取master节点的地址信息,找到了就退出循环。 get-master-addr-by-name
命令执行结果如下所示 [root@dev-server-1 master-slave]# redis-cli -p 26379 127.0.0.1:26379> sentinel get-master-addr-by-name mymaster 1) "192.168.40.201" 2) "7001" 127.0.0.1:26379>
+switch-master
频道,当发生故障转移时,客户端能收到哨兵的通知,通过重新初始化连接池,完成主节点的切换。 MasterListener.run方法中监听哨兵部分代码如下
j.subscribe(new JedisPubSub() { @Override public void onMessage(String channel, String message) { log.fine("Sentinel " + host + ":" + port + " published: " + message + "."); String[] switchMasterMsg = message.split(" "); if (switchMasterMsg.length > 3) { if (masterName.equals(switchMasterMsg[0])) { initPool(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]))); } else { log.fine("Ignoring message on +switch-master for master name " + switchMasterMsg[0] + ", our master name is " + masterName); } } else { log.severe("Invalid message received on Sentinel " + host + ":" + port + " on channel +switch-master: " + message); } } }, "+switch-master");
initPool 方法如下:如果发现新的master节点与当前的master不同,则重新初始化。
private void initPool(HostAndPort master) { if (!master.equals(currentHostMaster)) { currentHostMaster = master; if (factory == null) { factory = new JedisFactory(master.getHost(), master.getPort(), connectionTimeout, soTimeout, password, database, clientName, false, null, null, null); initPool(poolConfig, factory); } else { factory.setHostAndPort(currentHostMaster); // although we clear the pool, we still have to check the // returned object // in getResource, this call only clears idle instances, not // borrowed instances internalPool.clear(); } log.info("Created JedisPool to master at " + master); } }
通过以上两步,Jedis客户端在只知道哨兵地址的情况下便能获得master节点的地址信息,并且当发生故障转移时能自动切换到新的master节点地址。
Spring Boot 2 整合Redis Cluster模式除了配置稍有差异,其它与整合单实例模式也类似,配置示例为
spring: redis: password: passw0rd timeout: 5000 database: 0 cluster: nodes: 192.168.40.201:7100,192.168.40.201:7200,192.168.40.201:7300,192.168.40.201:7400,192.168.40.201:7500,192.168.40.201:7600 max-redirects: 3 # 重定向的最大次数 jedis: pool: max-active: 8 max-wait: -1 max-idle: 8 min-idle: 0
完整示例可查阅源码: https://github.com/ronwxy/spr...
在 一文掌握Redis的三种集群方案 中已经介绍了Cluster模式访问的基本原理,可以通过任意节点跳转到目标节点执行命令,上面配置中 max-redirects 控制在集群中跳转的最大次数。
查看JedisClusterConnection的execute方法,
public Object execute(String command, byte[]... args) { Assert.notNull(command, "Command must not be null!"); Assert.notNull(args, "Args must not be null!"); return clusterCommandExecutor .executeCommandOnArbitraryNode((JedisClusterCommandCallback<Object>) client -> JedisClientUtils.execute(command, EMPTY_2D_BYTE_ARRAY, args, () -> client)) .getValue(); }
集群命令的执行是通过 ClusterCommandExecutor.executeCommandOnArbitraryNode
来实现的,
public <T> NodeResult<T> executeCommandOnArbitraryNode(ClusterCommandCallback<?, T> cmd) { Assert.notNull(cmd, "ClusterCommandCallback must not be null!"); List<RedisClusterNode> nodes = new ArrayList<>(getClusterTopology().getActiveNodes()); return executeCommandOnSingleNode(cmd, nodes.get(new Random().nextInt(nodes.size()))); } private <S, T> NodeResult<T> executeCommandOnSingleNode(ClusterCommandCallback<S, T> cmd, RedisClusterNode node, int redirectCount) { Assert.notNull(cmd, "ClusterCommandCallback must not be null!"); Assert.notNull(node, "RedisClusterNode must not be null!"); if (redirectCount > maxRedirects) { throw new TooManyClusterRedirectionsException(String.format( "Cannot follow Cluster Redirects over more than %s legs. Please consider increasing the number of redirects to follow. Current value is: %s.", redirectCount, maxRedirects)); } RedisClusterNode nodeToUse = lookupNode(node); S client = this.resourceProvider.getResourceForSpecificNode(nodeToUse); Assert.notNull(client, "Could not acquire resource for node. Is your cluster info up to date?"); try { return new NodeResult<>(node, cmd.doInCluster(client)); } catch (RuntimeException ex) { RuntimeException translatedException = convertToDataAccessException(ex); if (translatedException instanceof ClusterRedirectException) { ClusterRedirectException cre = (ClusterRedirectException) translatedException; return executeCommandOnSingleNode(cmd, topologyProvider.getTopology().lookup(cre.getTargetHost(), cre.getTargetPort()), redirectCount + 1); } else { throw translatedException != null ? translatedException : ex; } } finally { this.resourceProvider.returnResourceForSpecificNode(nodeToUse, client); } }
上述代码逻辑如下
检查服务是否正常启动(比如 ps -ef|grep redis
查看进程, netstat -ano|grep 6379
查看端口是否起来,以及日志文件),如果正常启动,则查看Redis服务器是否开启防火墙,关闭防火墙或配置通行端口。
Connection refused: no further information: /127.0.0.1:7600
这是因为在redis.conf中配置 bind 0.0.0.0
或 bind 127.0.0.1
导致,需要改为具体在外部可访问的IP,如 bind 192.168.40.201
。如果之前已经起了集群,并产生了数据,则修改redis.conf文件后,还需要修改cluster-config-file文件,将127.0.0.1替换为bind 的具体IP,然后重启。
如果设置了密码,需要在master, slave的配置文件中都配置 masterauth password
相关阅读:
作者:空山新雨,一枚仍在学习路上的IT老兵
近期作者写了几十篇技术博客,内容包括Java、Spring Boot、Spring Cloud、Docker,技术管理心得等
欢迎关注作者微信公众号:空山新雨的技术空间,一起学习成长