在学习 Hystrix
的请求缓存与请求合并过程中,不禁产生疑问,如何实现基于一个类似于“ ThreadLocal
变量”,但上下文运用范围为 Request
维度,也就是“ HystrixRequestContext
”。
一个 Request
,即 Command
可以由多个线程执行,举个例子,我们使用线程A和线程B处理 TestCommand
,应该属于两次 TestCommand
执行,A线程和B线程是怎么共用的缓存,减少了额外的逻辑执行?
结合 Hystrix请求合并与请求缓存(一):请求缓存
、 Hystrix请求合并与请求缓存(二):请求合并
中的分析, HystrixRequestContext
怎么如注释所说,实现 request scoped
?
Contains the state and manages the lifecycle of {@link HystrixRequestVariableDefault} objects that provide request scoped (rather than only thread scoped) variables so that multiple threads within a single request can share state.
HystrixCollapser
来实现的,执行合并请求通过使用 HystrixContextCallable
来实现。 CollapsedTask
构造器中一段注释, 即运行过程中,使用上一级线程的上下文,例如Tomcat线程
。 // this gets executed from the context of a HystrixCommand parent thread (such as a Tomcat thread) 复制代码
// HystrixContextRunnable是个Runnable,一个可用于执行的任务 public class HystrixContextRunnable implements Runnable { private final Callable<Void> actual; private final HystrixRequestContext parentThreadState; public HystrixContextRunnable(Runnable actual) { this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual); } public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) { // 获取当前线程的HystrixRequestContext this(concurrencyStrategy, HystrixRequestContext.getContextForCurrentThread(), actual); } // 关键的构造器 public HystrixContextRunnable(final HystrixConcurrencyStrategy concurrencyStrategy, final HystrixRequestContext hystrixRequestContext, final Runnable actual) { // 将原始Callable装饰, 创建了一个新的callable this.actual = concurrencyStrategy.wrapCallable(new Callable<Void>() { @Override public Void call() throws Exception { actual.run(); return null; } }); // 存储当前线程的hystrixRequestContext this.parentThreadState = hystrixRequestContext; } @Override public void run() { // 运行实际的Runnable之前先保存当前线程已有的HystrixRequestContext HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread(); try { // 设置当前线程的HystrixRequestContext,来自上一级线程,因此两个线程是同一个HystrixRequestContext HystrixRequestContext.setContextOnCurrentThread(parentThreadState); try { actual.call(); } catch (Exception e) { throw new RuntimeException(e); } } finally { // 还原当前线程的HystrixRequestContext HystrixRequestContext.setContextOnCurrentThread(existingState); } } } 复制代码
HystrixRequestContext
替换为了上一级线程的 HystrixRequestContext
,执行后还原,实现了 HystrixRequestContext
的传递。 public class DemoRequestContext { // ThreadLoacal private static ThreadLocal<DemoRequestContext> demo = new ThreadLocal<>(); // 线程上下文,存储map private ConcurrentHashMap<String, String> concurrentHashMap = new ConcurrentHashMap<>(); private DemoRequestContext() { } // 初始化上下文 public static DemoRequestContext initializeContext() { DemoRequestContext context = new DemoRequestContext(); demo.set(context); return context; } // 设置当前线程的上下文 public static void setContextOnCurrentThread(DemoRequestContext state) { demo.set(state); } // 获取当前线程的上下文 public static DemoRequestContext getContextForCurrentThread() { DemoRequestContext context = demo.get(); if (context != null && context.concurrentHashMap != null) { return context; } else { return null; } } // 存储当前线程需要存储的数据 key-value public void set(String key, String value) { if (DemoRequestContext.getContextForCurrentThread() == null) { throw new IllegalArgumentException(DemoRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used."); } DemoRequestContext.getContextForCurrentThread().concurrentHashMap.put(key, value); } // 根据key获取当前线程上下文中的值 public String get(String key) { if (DemoRequestContext.getContextForCurrentThread() == null) { throw new IllegalArgumentException(DemoRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used."); } ConcurrentHashMap<String, String> variableMap = DemoRequestContext.getContextForCurrentThread().concurrentHashMap; return variableMap.get(key); } } 复制代码
public class DemoContextCallable<V> implements Callable<V> { private final Callable<V> callable; private final DemoRequestContext parentThreadState; // 构造时 将上一级线程上下文注入 public DemoContextCallable(Callable<V> callable) { this.callable = callable; this.parentThreadState = DemoRequestContext.getContextForCurrentThread(); } // 替换线程上下文的操作 @Override public V call() throws Exception { DemoRequestContext context = DemoRequestContext.getContextForCurrentThread(); try { DemoRequestContext.setContextOnCurrentThread(parentThreadState); return callable.call(); } finally { DemoRequestContext.setContextOnCurrentThread(context); } } } 复制代码
public class ContextTest { private static final ExecutorService executorService = Executors.newCachedThreadPool(); public static void main(String[] args) { // 主线程的context DemoRequestContext.initializeContext(); // 主线程存储的key-value DemoRequestContext context = DemoRequestContext.getContextForCurrentThread(); context.set("name", "parentThread"); DemoContextCallable<String> contextCallable = new DemoContextCallable<String>(new Callable<String>() { @Override public String call() throws Exception { // 子线程中取出上下文内容 return DemoRequestContext.getContextForCurrentThread().get("name"); } }); // 线程池运行 List<Future<String>> list = Lists.newArrayList(); for (int i = 0; i < 3; i++) { Future<String> future= executorService.submit(contextCallable); list.add(future); } for (Future<String> future : list) { try { System.out.println(future.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } } } 复制代码
parentThread parentThread parentThread 复制代码
一直对 HystrixRequestContext
的实现原理很困惑,官方推荐是在 Filter
中初始化并 shutdown
,因此有以下理解。
HystrixRequestContext
是一次 HttpRequest
中的上下文,一次请求中可能有有多个线程执行多个 Command
。 HystrixRequestCache
和 HystrixRequestContext
生命周期一致,都是一次 HttpRequest
。