Spring Cloud Hystrix 是Spring Cloud Netflix 子项目的核心组件之一,具有服务容错及线程隔离等一系列服务保护功能,本文将对其用法进行详细介绍。
在微服务架构中,服务与服务之间通过远程调用的方式进行通信,一旦某个被调用的服务发生了故障,其依赖服务也会发生故障,此时就会发生故障的蔓延,最终导致系统瘫痪。Hystrix实现了断路器模式,当某个服务发生故障时,通过断路器的监控,给调用方返回一个错误响应,而不是长时间的等待,这样就不会使得调用方由于长时间得不到响应而占用线程,从而防止故障的蔓延。Hystrix具备服务降级、服务熔断、线程隔离、请求缓存、请求合并及服务监控等强大功能。
这里我们创建一个hystrix-service模块来演示hystrix的常用功能。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> 复制代码
主要是配置了端口、注册中心地址及user-service的调用路径。
server: port: 8401 spring: application: name: hystrix-service eureka: client: register-with-eureka: true fetch-registry: true service-url: defaultZone: http://localhost:8001/eureka/ service-url: user-service: http://user-service 复制代码
@EnableCircuitBreaker @EnableDiscoveryClient @SpringBootApplication public class HystrixServiceApplication { public static void main(String[] args) { SpringApplication.run(HystrixServiceApplication.class, args); } 复制代码
@GetMapping("/testFallback/{id}") public CommonResult testFallback(@PathVariable Long id) { return userService.getUser(id); } 复制代码
@HystrixCommand(fallbackMethod = "getDefaultUser") public CommonResult getUser(Long id) { return restTemplate.getForObject(userServiceUrl + "/user/{1}", CommonResult.class, id); } public CommonResult getDefaultUser(@PathVariable Long id) { User defaultUser = new User(-1L, "defaultUser", "123456"); return new CommonResult<>(defaultUser); } 复制代码
@GetMapping("/testCommand/{id}") public CommonResult testCommand(@PathVariable Long id) { return userService.getUserCommand(id); } 复制代码
@HystrixCommand(fallbackMethod = "getDefaultUser", commandKey = "getUserCommand", groupKey = "getUserGroup", threadPoolKey = "getUserThreadPool") public CommonResult getUserCommand(@PathVariable Long id) { return restTemplate.getForObject(userServiceUrl + "/user/{1}", CommonResult.class, id); } 复制代码
@GetMapping("/testException/{id}") public CommonResult testException(@PathVariable Long id) { return userService.getUserException(id); } 复制代码
@HystrixCommand(fallbackMethod = "getDefaultUser2", ignoreExceptions = {NullPointerException.class}) public CommonResult getUserException(Long id) { if (id == 1) { throw new IndexOutOfBoundsException(); } else if (id == 2) { throw new NullPointerException(); } return restTemplate.getForObject(userServiceUrl + "/user/{1}", CommonResult.class, id); } public CommonResult getDefaultUser2(@PathVariable Long id, Throwable e) { LOGGER.error("getDefaultUser2 id:{},throwable class:{}", id, e.getClass()); User defaultUser = new User(-2L, "defaultUser2", "123456"); return new CommonResult<>(defaultUser); } 复制代码
当系统并发量越来越大时,我们需要使用缓存来优化系统,达到减轻并发请求线程数,提供响应速度的效果。
@GetMapping("/testCache/{id}") public CommonResult testCache(@PathVariable Long id) { userService.getUserCache(id); userService.getUserCache(id); userService.getUserCache(id); return new CommonResult("操作成功", 200); } 复制代码
@CacheResult(cacheKeyMethod = "getCacheKey") @HystrixCommand(fallbackMethod = "getDefaultUser", commandKey = "getUserCache") public CommonResult getUserCache(Long id) { LOGGER.info("getUserCache id:{}", id); return restTemplate.getForObject(userServiceUrl + "/user/{1}", CommonResult.class, id); } /** * 为缓存生成key的方法 */ public String getCacheKey(Long id) { return String.valueOf(id); } 复制代码
@GetMapping("/testRemoveCache/{id}") public CommonResult testRemoveCache(@PathVariable Long id) { userService.getUserCache(id); userService.removeCache(id); userService.getUserCache(id); return new CommonResult("操作成功", 200); } 复制代码
@CacheRemove(commandKey = "getUserCache", cacheKeyMethod = "getCacheKey") @HystrixCommand public CommonResult removeCache(Long id) { LOGGER.info("removeCache id:{}", id); return restTemplate.postForObject(userServiceUrl + "/user/delete/{1}", null, CommonResult.class, id); } 复制代码
java.lang.IllegalStateException: Request caching is not available. Maybe you need to initialize the HystrixRequestContext? at com.netflix.hystrix.HystrixRequestCache.get(HystrixRequestCache.java:104) ~[hystrix-core-1.5.18.jar:1.5.18] at com.netflix.hystrix.AbstractCommand$7.call(AbstractCommand.java:478) ~[hystrix-core-1.5.18.jar:1.5.18] at com.netflix.hystrix.AbstractCommand$7.call(AbstractCommand.java:454) ~[hystrix-core-1.5.18.jar:1.5.18] 复制代码
/** * Created by macro on 2019/9/4. */ @Component @WebFilter(urlPatterns = "/*",asyncSupported = true) public class HystrixRequestContextFilter implements Filter { @Override public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { filterChain.doFilter(servletRequest, servletResponse); } finally { context.close(); } } } 复制代码
微服务系统中的服务间通信,需要通过远程调用来实现,随着调用次数越来越多,占用线程资源也会越来越多。Hystrix中提供了@HystrixCollapser用于合并请求,从而达到减少通信消耗及线程数量的效果。
@GetMapping("/testCollapser") public CommonResult testCollapser() throws ExecutionException, InterruptedException { Future<User> future1 = userService.getUserFuture(1L); Future<User> future2 = userService.getUserFuture(2L); future1.get(); future2.get(); ThreadUtil.safeSleep(200); Future<User> future3 = userService.getUserFuture(3L); future3.get(); return new CommonResult("操作成功", 200); } 复制代码
@HystrixCollapser(batchMethod = "getUserByIds",collapserProperties = { @HystrixProperty(name = "timerDelayInMilliseconds", value = "100") }) public Future<User> getUserFuture(Long id) { return new AsyncResult<User>(){ @Override public User invoke() { CommonResult commonResult = restTemplate.getForObject(userServiceUrl + "/user/{1}", CommonResult.class, id); Map data = (Map) commonResult.getData(); User user = BeanUtil.mapToBean(data,User.class,true); LOGGER.info("getUserById username:{}", user.getUsername()); return user; } }; } @HystrixCommand public List<User> getUserByIds(List<Long> ids) { LOGGER.info("getUserByIds:{}", ids); CommonResult commonResult = restTemplate.getForObject(userServiceUrl + "/user/getUserByIds?ids={1}", CommonResult.class, CollUtil.join(ids,",")); return (List<User>) commonResult.getData(); } 复制代码
hystrix: command: #用于控制HystrixCommand的行为 default: execution: isolation: strategy: THREAD #控制HystrixCommand的隔离策略,THREAD->线程池隔离策略(默认),SEMAPHORE->信号量隔离策略 thread: timeoutInMilliseconds: 1000 #配置HystrixCommand执行的超时时间,执行超过该时间会进行服务降级处理 interruptOnTimeout: true #配置HystrixCommand执行超时的时候是否要中断 interruptOnCancel: true #配置HystrixCommand执行被取消的时候是否要中断 timeout: enabled: true #配置HystrixCommand的执行是否启用超时时间 semaphore: maxConcurrentRequests: 10 #当使用信号量隔离策略时,用来控制并发量的大小,超过该并发量的请求会被拒绝 fallback: enabled: true #用于控制是否启用服务降级 circuitBreaker: #用于控制HystrixCircuitBreaker的行为 enabled: true #用于控制断路器是否跟踪健康状况以及熔断请求 requestVolumeThreshold: 20 #超过该请求数的请求会被拒绝 forceOpen: false #强制打开断路器,拒绝所有请求 forceClosed: false #强制关闭断路器,接收所有请求 requestCache: enabled: true #用于控制是否开启请求缓存 collapser: #用于控制HystrixCollapser的执行行为 default: maxRequestsInBatch: 100 #控制一次合并请求合并的最大请求数 timerDelayinMilliseconds: 10 #控制多少毫秒内的请求会被合并成一个 requestCache: enabled: true #控制合并请求是否开启缓存 threadpool: #用于控制HystrixCommand执行所在线程池的行为 default: coreSize: 10 #线程池的核心线程数 maximumSize: 10 #线程池的最大线程数,超过该线程数的请求会被拒绝 maxQueueSize: -1 #用于设置线程池的最大队列大小,-1采用SynchronousQueue,其他正数采用LinkedBlockingQueue queueSizeRejectionThreshold: 5 #用于设置线程池队列的拒绝阀值,由于LinkedBlockingQueue不能动态改版大小,使用时需要用该参数来控制线程数 复制代码
实例配置只需要将全局配置中的default换成与之对应的key即可。
hystrix: command: HystrixComandKey: #将default换成HystrixComrnandKey execution: isolation: strategy: THREAD collapser: HystrixCollapserKey: #将default换成HystrixCollapserKey maxRequestsInBatch: 100 threadpool: HystrixThreadPoolKey: #将default换成HystrixThreadPoolKey coreSize: 10 复制代码
springcloud-learning ├── eureka-server -- eureka注册中心 ├── user-service -- 提供User对象CRUD接口的服务 └── hystrix-service -- hystrix服务调用测试服务 复制代码