转载

Spring Cloud中Hystrix的请求合并

在微服务架构中,我们将一个项目拆分成很多个独立的模块,这些独立的模块通过远程调用来互相配合工作,但是,在高并发情况下,通信次数的增加会导致总的通信时间增加,同时,线程池的资源也是有限的,高并发环境会导致有大量的线程处于等待状态,进而导致响应延迟,为了解决这些问题,我们需要来了解Hystrix的请求合并。

本文是Spring Cloud系列的第十四篇文章,了解前十三篇文章内容有助于更好的理解本文:

1. 使用Spring Cloud搭建服务注册中心

2. 使用Spring Cloud搭建高可用服务注册中心

3. Spring Cloud中服务的发现与消费

4. Eureka中的核心概念

5. 什么是客户端负载均衡

6. Spring RestTemplate中几种常见的请求方式

7. RestTemplate的逆袭之路,从发送请求到负载均衡

8. Spring Cloud中负载均衡器概览

9. Spring Cloud中的负载均衡策略

10. Spring Cloud中的断路器Hystrix

11. Spring Cloud自定义Hystrix请求命令

12. Spring Cloud中Hystrix的服务降级与异常处理

13. Spring Cloud中Hystrix的请求缓存

Hystrix中的请求合并,就是利用一个合并处理器,将对同一个服务发起的连续请求合并成一个请求进行处理(这些连续请求的时间窗默认为10ms),在这个过程中涉及到的一个核心类就是HystrixCollapser,OK,接下来我们就来看看如何实现Hystrix的请求合并。由于本文是在前面十三篇的基础上创作的,因此我这里不再赘述整个环境的搭建过程,不熟悉的小伙伴可以翻看前面的文章。

服务提供者接口

我需在在服务提供者中提供两个接口供服务消费者调用,如下:

@RequestMapping("/getbook6")
public List<Book> book6(String ids) {
    System.out.println("ids>>>>>>>>>>>>>>>>>>>>>" + ids);
    ArrayList<Book> books = new ArrayList<>();
    books.add(new Book("《李自成》", 55, "姚雪垠", "人民文学出版社"));
    books.add(new Book("中国文学简史", 33, "林庚", "清华大学出版社"));
    books.add(new Book("文学改良刍议", 33, "胡适", "无"));
    books.add(new Book("ids", 22, "helloworld", "haha"));
    return books;
}

@RequestMapping("/getbook6/{id}")
public Book book61(@PathVariable Integer id) {
    Book book = new Book("《李自成》2", 55, "姚雪垠2", "人民文学出版社2");
    return book;
}

第一个接口是一个批处理接口,第二个接口是一个处理单个请求的接口。在批处理接口中,服务消费者传来的ids参数格式是1,2,3,4…这种格式,正常情况下我们需要根据ids查询到对应的数据,然后组装成一个集合返回,我这里为了处理方便,不管什么样的请求统统都返回一样的数据集;处理单个请求的接口就比较简单了,不再赘述。

服务消费者

OK,服务提供者处理好之后,接下来我们来看看服务消费者要怎么处理。

BookService

首先在BookService中添加两个方法用来调用服务提供者提供的接口,如下:

public Book test8(Long id) {
    return restTemplate.getForObject("http://HELLO-SERVICE/getbook6/{1}", Book.class, id);
}

public List<Book> test9(List<Long> ids) {
    System.out.println("test9---------"+ids+"Thread.currentThread().getName():" + Thread.currentThread().getName());
    Book[] books = restTemplate.getForObject("http://HELLO-SERVICE/getbook6?ids={1}", Book[].class, StringUtils.join(ids, ","));
    return Arrays.asList(books);
}

test8用来调用提供单个id的接口,test9用来调用批处理的接口,在test9中,我将test9执行时所处的线程打印出来,方便我们观察执行结果,另外,在RestTemplate中,如果返回值是一个集合,我们得先用一个数组接收,然后再转为集合(或许也有其他办法,小伙伴们有更好的建议可以提)。

BookBatchCommand

OK,BookService中的方法准备好了后,我们就可以来创建一个BookBatchCommand,这是一个批处理命令,如下:

public class BookBatchCommand extends HystrixCommand<List<Book>> {
    private List<Long> ids;
    private BookService bookService;

    public BookBatchCommand(List<Long> ids, BookService bookService) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CollapsingGroup"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("CollapsingKey")));
        this.ids = ids;
        this.bookService = bookService;
    }

    @Override
    protected List<Book> run() throws Exception {
        return bookService.test9(ids);
    }
}

这个类实际上和我们在上篇博客中介绍的类差不多,都是继承自HystrixCommand,用来处理合并之后的请求,在run方法中调用BookService中的test9方法。

BookCollapseCommand

接下来我们需要创建BookCollapseCommand继承自HystrixCollapser来实现请求合并。如下:

public class BookCollapseCommand extends HystrixCollapser<List<Book>, Book, Long> {
    private BookService bookService;
    private Long id;

    public BookCollapseCommand(BookService bookService, Long id) {
        super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("bookCollapseCommand")).andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100)));
        this.bookService = bookService;
        this.id = id;
    }

    @Override
    public Long getRequestArgument() {
        return id;
    }

    @Override
    protected HystrixCommand<List<Book>> createCommand(Collection<CollapsedRequest<Book, Long>> collapsedRequests) {
        List<Long> ids = new ArrayList<>(collapsedRequests.size());
        ids.addAll(collapsedRequests.stream().map(CollapsedRequest::getArgument).collect(Collectors.toList()));
        BookBatchCommand bookBatchCommand = new BookBatchCommand(ids, bookService);
        return bookBatchCommand;
    }

    @Override
    protected void mapResponseToRequests(List<Book> batchResponse, Collection<CollapsedRequest<Book, Long>> collapsedRequests) {
        System.out.println("mapResponseToRequests");
        int count = 0;
        for (CollapsedRequest<Book, Long> collapsedRequest : collapsedRequests) {
            Book book = batchResponse.get(count++);
            collapsedRequest.setResponse(book);
        }
    }
}

关于这个类,我说如下几点:

1.首先在构造方法中,我们设置了请求时间窗为100ms,即请求时间间隔在100ms之内的请求会被合并为一个请求。

2.createCommand方法主要用来合并请求,在这里获取到各个单个请求的id,将这些单个的id放到一个集合中,然后再创建出一个BookBatchCommand对象,用该对象去发起一个批量请求。

3.mapResponseToRequests方法主要用来为每个请求设置请求结果。该方法的第一个参数batchResponse表示批处理请求的结果,第二个参数collapsedRequests则代表了每一个被合并的请求,然后我们通过遍历batchResponse来为collapsedRequests设置请求结果。

OK,所有的这些操作完成后,我们就可以来测试啦。

测试

我们在服务消费者端创建访问接口,来测试合并请求,测试接口如下:

@RequestMapping("/test7")
@ResponseBody
public void test7() throws ExecutionException, InterruptedException {
    HystrixRequestContext context = HystrixRequestContext.initializeContext();
    BookCollapseCommand bc1 = new BookCollapseCommand(bookService, 1l);
    BookCollapseCommand bc2 = new BookCollapseCommand(bookService, 2l);
    BookCollapseCommand bc3 = new BookCollapseCommand(bookService, 3l);
    BookCollapseCommand bc4 = new BookCollapseCommand(bookService, 4l);
    Future<Book> q1 = bc1.queue();
    Future<Book> q2 = bc2.queue();
    Future<Book> q3 = bc3.queue();
    Book book1 = q1.get();
    Book book2 = q2.get();
    Book book3 = q3.get();
    Thread.sleep(3000);
    Future<Book> q4 = bc4.queue();
    Book book4 = q4.get();
    System.out.println("book1>>>"+book1);
    System.out.println("book2>>>"+book2);
    System.out.println("book3>>>"+book3);
    System.out.println("book4>>>"+book4);
    context.close();
}

关于这个测试接口我说如下两点:

1.首先要初始化HystrixRequestContext

2.创建BookCollapseCommand类的实例来发起请求,先发送3个请求,然后睡眠3秒钟,再发起1个请求,这样,前3个请求就会被合并为一个请求,第四个请求因为间隔的时间比较久,所以不会被合并,而是单独创建一个线程去处理。

OK,我们来看看执行结果,如下:

Spring Cloud中Hystrix的请求合并

通过注解实现请求合并

OK,上面这种请求合并方式写起来稍微有一点麻烦,我们可以使用注解来更优雅的实现这一功能。首先在BookService中添加两个方法,如下:

@HystrixCollapser(batchMethod = "test11",collapserProperties = {@HystrixProperty(name ="timerDelayInMilliseconds",value = "100")})
public Future<Book> test10(Long id) {
    return null;
}

@HystrixCommand
public List<Book> test11(List<Long> ids) {
    System.out.println("test9---------"+ids+"Thread.currentThread().getName():" + Thread.currentThread().getName());
    Book[] books = restTemplate.getForObject("http://HELLO-SERVICE/getbook6?ids={1}", Book[].class, StringUtils.join(ids, ","));
    return Arrays.asList(books);
}

在test10方法上添加@HystrixCollapser注解实现请求合并,用batchMethod属性指明请求合并后的处理方法,collapserProperties属性指定其他属性。

OK,在BookService中写好之后,直接调用就可以了,如下:

@RequestMapping("/test8")
@ResponseBody
public void test8() throws ExecutionException, InterruptedException {
    HystrixRequestContext context = HystrixRequestContext.initializeContext();
    Future<Book> f1 = bookService.test10(1l);
    Future<Book> f2 = bookService.test10(2l);
    Future<Book> f3 = bookService.test10(3l);
    Book b1 = f1.get();
    Book b2 = f2.get();
    Book b3 = f3.get();
    Thread.sleep(3000);
    Future<Book> f4 = bookService.test10(4l);
    Book b4 = f4.get();
    System.out.println("b1>>>"+b1);
    System.out.println("b2>>>"+b2);
    System.out.println("b3>>>"+b3);
    System.out.println("b4>>>"+b4);
    context.close();
}

和前面的一样,前三个请求会进行合并,第四个请求会单独执行,OK,执行结果如下:

Spring Cloud中Hystrix的请求合并

总结

请求合并的优点小伙伴们已经看到了,多个请求被合并为一个请求进行一次性处理,可以有效节省网络带宽和线程池资源,但是,有优点必然也有缺点,设置请求合并之后,本来一个请求可能5ms就搞定了,但是现在必须再等10ms看看还有没有其他的请求一起的,这样一个请求的耗时就从5ms增加到15ms了,不过,如果我们要发起的命令本身就是一个高延迟的命令,那么这个时候就可以使用请求合并了,因为这个时候时间窗的时间消耗就显得微不足道了,另外高并发也是请求合并的一个非常重要的场景。

Ok,我们的请求合并就说到这里,有问题欢迎小伙伴们留言讨论。

更多JavaEE资料请关注公众号:

Spring Cloud中Hystrix的请求合并

原文  http://blog.csdn.net/u012702547/article/details/78213270
正文到此结束
Loading...