public class FileBrowser { // 发送消息 private void send(Message message) { } // MQ消息返回后调用该方法 public void onMessage(Message message) { } public Response handleWebReq() { Message message = new Message(1L, "123"); // 发送消息 send(message); // 如何等待MQ返回消息? return new Response(); } } @AllArgsConstructor class Message { private Long id; private String content; } class Response { }
public class GuardedObject<T> { private static final int TIMEOUT = 1; // 受保护对象 private T obj; private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); // 获取受保护对象 public T get(Predicate<T> p) { lock.lock(); try { // MESA管程推荐写法 while (!p.test(obj)) { done.await(TIMEOUT, TimeUnit.SECONDS); } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } return obj; } // 事件通知方法 public void onChange(T obj) { lock.lock(); try { this.obj = obj; done.signalAll(); } finally { lock.unlock(); } } }
public class GuardedObject<T> { private static final int TIMEOUT = 1; // 保存所有的GuardedObject private static final Map<Object, GuardedObject> goMap = new ConcurrentHashMap<>(); // 受保护对象 private T obj; private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); public static <K> GuardedObject create(K key) { GuardedObject go = new GuardedObject(); goMap.put(key, go); return go; } public static <K, T> void fireEvent(K key, T obj) { GuardedObject go = goMap.remove(key); if (go != null) { go.onChange(obj); } } // 获取受保护对象 public T get(Predicate<T> p) { lock.lock(); try { // MESA管程推荐写法 while (!p.test(obj)) { done.await(TIMEOUT, TimeUnit.SECONDS); } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } return obj; } // 事件通知方法 public void onChange(T obj) { lock.lock(); try { this.obj = obj; done.signalAll(); } finally { lock.unlock(); } } }
public class FileBrowser { // 发送消息 private void send(Message message) { } // MQ消息返回后调用该方法 public void onMessage(Message message) { // 唤醒等待的线程 GuardedObject.fireEvent(message.getId(), message); } public Response handleWebReq() { Long id = 1L; Message message = new Message(id, "123"); GuardedObject go = GuardedObject.create(id); // 发送消息 send(message); // 等待MQ消息 go.get(Objects::nonNull); return new Response(); } } @Data @AllArgsConstructor class Message { private Long id; private String content; } class Response { }