本文主要研究一下dubbo的MonitorFilter
dubbo-2.7.2/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java
@Activate(group = {PROVIDER, CONSUMER}) public class MonitorFilter extends ListenableFilter { private static final Logger logger = LoggerFactory.getLogger(MonitorFilter.class); private static final String MONITOR_FILTER_START_TIME = "monitor_filter_start_time"; public MonitorFilter() { super.listener = new MonitorListener(); } /** * The Concurrent counter */ private final ConcurrentMap<String, AtomicInteger> concurrents = new ConcurrentHashMap<String, AtomicInteger>(); /** * The MonitorFactory */ private MonitorFactory monitorFactory; public void setMonitorFactory(MonitorFactory monitorFactory) { this.monitorFactory = monitorFactory; } /** * The invocation interceptor,it will collect the invoke data about this invocation and send it to monitor center * * @param invoker service * @param invocation invocation. * @return {@link Result} the invoke result * @throws RpcException */ @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { if (invoker.getUrl().hasParameter(MONITOR_KEY)) { invocation.setAttachment(MONITOR_FILTER_START_TIME, String.valueOf(System.currentTimeMillis())); getConcurrent(invoker, invocation).incrementAndGet(); // count up } return invoker.invoke(invocation); // proceed invocation chain } // concurrent counter private AtomicInteger getConcurrent(Invoker<?> invoker, Invocation invocation) { String key = invoker.getInterface().getName() + "." + invocation.getMethodName(); AtomicInteger concurrent = concurrents.get(key); if (concurrent == null) { concurrents.putIfAbsent(key, new AtomicInteger()); concurrent = concurrents.get(key); } return concurrent; } //...... }
dubbo-2.7.2/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java
class MonitorListener implements Listener { @Override public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) { if (invoker.getUrl().hasParameter(MONITOR_KEY)) { collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false); getConcurrent(invoker, invocation).decrementAndGet(); // count down } } @Override public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) { if (invoker.getUrl().hasParameter(MONITOR_KEY)) { collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true); getConcurrent(invoker, invocation).decrementAndGet(); // count down } } /** * The collector logic, it will be handled by the default monitor * * @param invoker * @param invocation * @param result the invoke result * @param remoteHost the remote host address * @param start the timestamp the invoke begin * @param error if there is an error on the invoke */ private void collect(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) { try { URL monitorUrl = invoker.getUrl().getUrlParameter(MONITOR_KEY); Monitor monitor = monitorFactory.getMonitor(monitorUrl); if (monitor == null) { return; } URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error); monitor.collect(statisticsURL); } catch (Throwable t) { logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t); } } /** * Create statistics url * * @param invoker * @param invocation * @param result * @param remoteHost * @param start * @param error * @return */ private URL createStatisticsUrl(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) { // ---- service statistics ---- long elapsed = System.currentTimeMillis() - start; // invocation cost int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count String application = invoker.getUrl().getParameter(APPLICATION_KEY); String service = invoker.getInterface().getName(); // service name String method = RpcUtils.getMethodName(invocation); // method name String group = invoker.getUrl().getParameter(GROUP_KEY); String version = invoker.getUrl().getParameter(VERSION_KEY); int localPort; String remoteKey, remoteValue; if (CONSUMER_SIDE.equals(invoker.getUrl().getParameter(SIDE_KEY))) { // ---- for service consumer ---- localPort = 0; remoteKey = MonitorService.PROVIDER; remoteValue = invoker.getUrl().getAddress(); } else { // ---- for service provider ---- localPort = invoker.getUrl().getPort(); remoteKey = MonitorService.CONSUMER; remoteValue = remoteHost; } String input = "", output = ""; if (invocation.getAttachment(INPUT_KEY) != null) { input = invocation.getAttachment(INPUT_KEY); } if (result != null && result.getAttachment(OUTPUT_KEY) != null) { output = result.getAttachment(OUTPUT_KEY); } return new URL(COUNT_PROTOCOL, NetUtils.getLocalHost(), localPort, service + PATH_SEPARATOR + method, MonitorService.APPLICATION, application, MonitorService.INTERFACE, service, MonitorService.METHOD, method, remoteKey, remoteValue, error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1", MonitorService.ELAPSED, String.valueOf(elapsed), MonitorService.CONCURRENT, String.valueOf(concurrent), INPUT_KEY, input, OUTPUT_KEY, output, GROUP_KEY, group, VERSION_KEY, version); } }
dubbo-2.7.2/dubbo-monitor/dubbo-monitor-api/src/test/java/org/apache/dubbo/monitor/support/MonitorFilterTest.java
public class MonitorFilterTest { private volatile URL lastStatistics; private volatile Invocation lastInvocation; private final Invoker<MonitorService> serviceInvoker = new Invoker<MonitorService>() { @Override public Class<MonitorService> getInterface() { return MonitorService.class; } public URL getUrl() { try { return URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":20880?" + APPLICATION_KEY + "=abc&" + SIDE_KEY + "=" + CONSUMER_SIDE + "&" + MONITOR_KEY + "=" + URLEncoder.encode("dubbo://" + NetUtils.getLocalHost() + ":7070", "UTF-8")); } catch (UnsupportedEncodingException e) { throw new IllegalStateException(e.getMessage(), e); } } @Override public boolean isAvailable() { return false; } public Result invoke(Invocation invocation) throws RpcException { lastInvocation = invocation; return AsyncRpcResult.newDefaultAsyncResult(invocation); } @Override public void destroy() { } }; private MonitorFactory monitorFactory = new MonitorFactory() { @Override public Monitor getMonitor(final URL url) { return new Monitor() { public URL getUrl() { return url; } @Override public boolean isAvailable() { return true; } @Override public void destroy() { } public void collect(URL statistics) { MonitorFilterTest.this.lastStatistics = statistics; } public List<URL> lookup(URL query) { return Arrays.asList(MonitorFilterTest.this.lastStatistics); } }; } }; @Test public void testFilter() throws Exception { MonitorFilter monitorFilter = new MonitorFilter(); monitorFilter.setMonitorFactory(monitorFactory); Invocation invocation = new RpcInvocation("aaa", new Class<?>[0], new Object[0]); RpcContext.getContext().setRemoteAddress(NetUtils.getLocalHost(), 20880).setLocalAddress(NetUtils.getLocalHost(), 2345); Result result = monitorFilter.invoke(serviceInvoker, invocation); result.thenApplyWithContext((r) -> { monitorFilter.listener().onResponse(r, serviceInvoker, invocation); return r; }); while (lastStatistics == null) { Thread.sleep(10); } Assertions.assertEquals("abc", lastStatistics.getParameter(MonitorService.APPLICATION)); Assertions.assertEquals(MonitorService.class.getName(), lastStatistics.getParameter(MonitorService.INTERFACE)); Assertions.assertEquals("aaa", lastStatistics.getParameter(MonitorService.METHOD)); Assertions.assertEquals(NetUtils.getLocalHost() + ":20880", lastStatistics.getParameter(MonitorService.PROVIDER)); Assertions.assertEquals(NetUtils.getLocalHost(), lastStatistics.getAddress()); Assertions.assertEquals(null, lastStatistics.getParameter(MonitorService.CONSUMER)); Assertions.assertEquals(1, lastStatistics.getParameter(MonitorService.SUCCESS, 0)); Assertions.assertEquals(0, lastStatistics.getParameter(MonitorService.FAILURE, 0)); Assertions.assertEquals(1, lastStatistics.getParameter(MonitorService.CONCURRENT, 0)); Assertions.assertEquals(invocation, lastInvocation); } @Test public void testSkipMonitorIfNotHasKey() { MonitorFilter monitorFilter = new MonitorFilter(); MonitorFactory mockMonitorFactory = mock(MonitorFactory.class); monitorFilter.setMonitorFactory(mockMonitorFactory); Invocation invocation = new RpcInvocation("aaa", new Class<?>[0], new Object[0]); Invoker invoker = mock(Invoker.class); given(invoker.getUrl()).willReturn(URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":20880?" + APPLICATION_KEY + "=abc&" + SIDE_KEY + "=" + CONSUMER_SIDE)); monitorFilter.invoke(invoker, invocation); verify(mockMonitorFactory, never()).getMonitor(any(URL.class)); } @Test public void testGenericFilter() throws Exception { MonitorFilter monitorFilter = new MonitorFilter(); monitorFilter.setMonitorFactory(monitorFactory); Invocation invocation = new RpcInvocation("$invoke", new Class<?>[]{String.class, String[].class, Object[].class}, new Object[]{"xxx", new String[]{}, new Object[]{}}); RpcContext.getContext().setRemoteAddress(NetUtils.getLocalHost(), 20880).setLocalAddress(NetUtils.getLocalHost(), 2345); Result result = monitorFilter.invoke(serviceInvoker, invocation); result.thenApplyWithContext((r) -> { monitorFilter.listener().onResponse(r, serviceInvoker, invocation); return r; }); while (lastStatistics == null) { Thread.sleep(10); } Assertions.assertEquals("abc", lastStatistics.getParameter(MonitorService.APPLICATION)); Assertions.assertEquals(MonitorService.class.getName(), lastStatistics.getParameter(MonitorService.INTERFACE)); Assertions.assertEquals("xxx", lastStatistics.getParameter(MonitorService.METHOD)); Assertions.assertEquals(NetUtils.getLocalHost() + ":20880", lastStatistics.getParameter(MonitorService.PROVIDER)); Assertions.assertEquals(NetUtils.getLocalHost(), lastStatistics.getAddress()); Assertions.assertEquals(null, lastStatistics.getParameter(MonitorService.CONSUMER)); Assertions.assertEquals(1, lastStatistics.getParameter(MonitorService.SUCCESS, 0)); Assertions.assertEquals(0, lastStatistics.getParameter(MonitorService.FAILURE, 0)); Assertions.assertEquals(1, lastStatistics.getParameter(MonitorService.CONCURRENT, 0)); Assertions.assertEquals(invocation, lastInvocation); } @Test public void testSafeFailForMonitorCollectFail() { MonitorFilter monitorFilter = new MonitorFilter(); MonitorFactory mockMonitorFactory = mock(MonitorFactory.class); Monitor mockMonitor = mock(Monitor.class); Mockito.doThrow(new RuntimeException()).when(mockMonitor).collect(any(URL.class)); monitorFilter.setMonitorFactory(mockMonitorFactory); given(mockMonitorFactory.getMonitor(any(URL.class))).willReturn(mockMonitor); Invocation invocation = new RpcInvocation("aaa", new Class<?>[0], new Object[0]); monitorFilter.invoke(serviceInvoker, invocation); } }
MonitorFilter继承了ListenableFilter,其invoke方法在invoker的URL中包含有monitor参数时会给invocation设置monitor_filter_start_time的attachment,然后递增当前并发的次数;其创建的listener为MonitorListener;MonitorListener实现了Listener接口,其onResponse及onError方法在invoker的URL中包含有monitor参数时会上报指标,然后递减并发次数