本文的目的:
1.同步发送和异步发送原理解析
2.浅谈RocketMQ的架构设计
Apache RocketMQ is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability.
Apache RocketMQ 是一个分布式消息传递和流媒体平台,具有低延迟,高性能和可靠性,万亿级容量和灵活的可扩展性。
高可用、高性能
DefaultMQProducer producer = new DefaultMQProducer("SYNC_PRODUCER_GROUP"); // 设置NameServer地址 producer.setNamesrvAddr("localhost:9876"); // 只需要在发送前初始化一次 producer.start(); // 构建消息实体 Message msg = new Message("SYNC_MSG_TOPIC", "TagA", ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送同步消息 SendResult sendResult = producer.send(msg); 复制代码
主要校验producerGroup属性是否满足 复制代码
创建MQClientlnstance实例 MQClientlnstance封装了RocketMQ网络处理API,是消息生产者( Producer)、消息消费者(Consumer)与NameServer、Broker打交道的网络通道 复制代码
将当前生产者加入到MQClientlnstance管理中,方便后续调用网络请求、进行心跳检测等 复制代码
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } } 复制代码
这块做的事情有点多 首先 this.mQClientAPIImpl.start()
org.apache.rocketmq.remoting.netty.NettyRemotingClient#start 通过romoting (netty客户端的实现)去建立连接 (反正这块可以理解为通过这个操作,可以服务通信了) 复制代码
再者我们了解下startScheduledTask大心脏
private void startScheduledTask() { // 定时校验nameSrv 保证地址不为空来维持后续服务的可用性 if (null == this.clientConfig.getNamesrvAddr()) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr(); } catch (Exception e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } // 定时刷新topic路由信息到客户端实例上 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } } }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); // 定时清理离线的broker 并发送心跳保活 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.cleanOfflineBroker(); MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); // 定时获取所有消费进度 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); // 定时调整线程池 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.adjustThreadPool(); } catch (Exception e) { log.error("ScheduledTask adjustThreadPool exception", e); } } }, 1, 1, TimeUnit.MINUTES); } 复制代码
接着启动了拉取消息服务
通过pullRequestQueue队列来维护拉取的消息 复制代码
this.rebalanceService.start
内部定时轮询做负载均衡 复制代码
给所有broker发送心跳并且加锁 复制代码
注册相关的shutDown钩子 复制代码
start()的流程主要步骤
绿色块的是核心步骤,主要围绕这几块核心阐述一下 这边就不贴具体的代码了 避免篇幅过长
消息发送之前,首先需要获取主题的路由信息,只有获取了这些信息我们才知道消息 要发送到具体的 Broker节点
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { // 从本地缓存读取尝试获取 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); // 通过topic获取配置 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { // 如果没有获取到配置,通过默认的topic去找路由配置信息 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } } 复制代码
步骤如下
udateTopicRoutelnfoFromNameServer这个方法的功能是消息生产者更新和维护路由缓存,其内部会对比路由信息和本地的缓存路由信息,以此判断是否需要更新路由信息
其实在这之前有行代码也值得关注一下 这块同步发送,mq本身是有个重试的次数可配置 默认x+1 然后根据发送的次数进行按需重试,如果失败就continue进入for循环
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; 复制代码
然后我们具体看一下是这么去选择消息队列的
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //是否开启故障延时机制 if (this.sendLatencyFaultEnable) { try { // 通过ThreadLocal保存上一次发送的消息队列下标 int index = tpInfo.getSendWhichQueue().getAndIncrement(); // 循环topic下所有的消息队列 确保所在Broker是正常的 for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); // 判断当前消息队列是否可用 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); } 复制代码
首先在一次消息发送过程中,可能会多次执行选择消息队列这个方法,lastBrokerName 就是上一次选择的执行发送消息失败的Broker.
第一次执行消息队列选择时,lastBrokerName为null,此时直接用sendWhichQueue自增再获取值,与当前路由表中消息队列个数取模,返回该位置的MessageQueue(selectOneMessageQueue()方法),如果消息发送再失败的话, 下次进行消息队列选择时规避上次 MesageQueue 所在的Broker,否则还是很有可能再次失败.
该算法在一次消息发送过程中能成功规避故障的Broker,但如果Broker若机,由于路由算法中的消息队列是按Broker排序的,如果上一次根据路由算法选择的是宕机的 Broker的第一个队列,那么随后的下次选择的是宕机Broker的第二个队列,消息发送很有可能会失败,再次引发重试,带来不必要的性能损耗,那么有什么方法在一次消息发送失败后,暂时将该Broker排除在消息队列选择范围外呢?或许有朋友会问,Broker不可用后路由信息中为什么还会包含该Broker的路由信息呢?其实这不难解释:首先, NameServer检测Broker是否可用是有延迟的,最短为一次心跳检测间隔(1Os);
其次,NameServer不会检测到Broker宕机后马上推送消息给消息生产者,而是消息生产者每隔30s更新一次路由信息,所以消息生产者最快感知Broker最新的路由信息也需要30s。 如果能引人一种机制,在Broker宕机期间,如果一次消息发送失败后,可以将该 Broker暂时排除在消息队列的选择范围中.
/** * 更新故障延迟 * * @param brokerName * @param currentLatency * @param isolation */ public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } } /** * 计算不可用间隔时间 * * @param currentLatency * @return */ private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0; } 复制代码
如果isolation为true,则使用30s作为computeNotAvailableDuration方法的参数;
如果isolation为false,则使用本次消息发送时延作为 computeNotAvailableDuration方法的参数,那computeNotAvailableDuration的作用是计算因本次消息发送故障需要将Broker规避的时长,也就是接下来多久的时间内该 Broker将不参与消息发送队列负载.
具体算法:从latencyMax数组尾部开始寻找,找到第一个比currentLatency小的下标,然后从notAvailableDuration数组中获取需要规避的时长,该方法最终调用 LatencyFaultTolerance的updateFaultltem。
DefaultMQProducer producer = new DefaultMQProducer("SYNC_PRODUCER_GROUP"); // 设置NameServer地址 producer.setNamesrvAddr("localhost:9876"); // 只需要在发送前初始化一次 producer.start(); for (int i = 0; i < 1; i++) { try { // 构建消息实体 Message msg = new Message("SYNC_MSG_TOPIC", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送同步消息 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); 复制代码
producer发送消息前,优先从本地路由表中读取,没有就从Name Server获取路由信息,更新本地信息表,并且producer每隔30s从Name Server同步路由信息
发送高可用的两个方式: 重试机制 、 故障规避机制
重试机制 就是在发送失败时,x+1的重试次数,尽可能的保障能把消息成功发出去。
故障规避 就是在消息发送过程中发现错误,那么就把这个broker加上规避时间,这段时间内都不会去选择这个broker发消息,提高发送消息的成功率。
private void executeInvokeCallback(final ResponseFuture responseFuture) { boolean runInThisThread = false; // 这块执行的时候,优先去获取可配置的公用线程池,如果有可用的就使用,没有就跑在当前线程中 ExecutorService executor = this.getCallbackExecutor(); if (executor != null) { try { executor.submit(new Runnable() { @Override public void run() { try { responseFuture.executeInvokeCallback(); } catch (Throwable e) { log.warn("execute callback in executor exception, and callback throw", e); } finally { responseFuture.release(); } } }); } catch (Exception e) { runInThisThread = true; log.warn("execute callback in executor exception, maybe executor busy", e); } } else { runInThisThread = true; } if (runInThisThread) { try { responseFuture.executeInvokeCallback(); } catch (Throwable e) { log.warn("executeInvokeCallback Exception", e); } finally { responseFuture.release(); } } } 复制代码
int tmp = curTimes.incrementAndGet(); if (needRetry && tmp <= timesTotal) { String retryBrokerName = brokerName;//by default, it will send to the same broker if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName); retryBrokerName = mqChosen.getBrokerName(); } String addr = instance.findBrokerAddressInPublish(retryBrokerName); log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr, retryBrokerName); try { request.setOpaque(RemotingCommand.createNewRequestId()); sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, context, producer); } catch (InterruptedException e1) { onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer); } catch (RemotingConnectException e1) { producer.updateFaultItem(brokerName, 3000, true); onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, true, producer); } catch (RemotingTooMuchRequestException e1) { onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer); } catch (RemotingException e1) { producer.updateFaultItem(brokerName, 3000, true); onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, true, producer); } } else { if (context != null) { context.setException(e); context.getProducer().executeSendMessageHookAfter(context); } try { sendCallback.onException(e); } catch (Exception ignored) { } } 复制代码
这里很好理解:在remotingClient.invokeAsync这块有异常的话递归重试,并且规避不可用的broker
异步发送的超时语义跟同步发送略有不同
同步在哪里返回?
当前设计线程池提交任务的时候就返回了,是毕竟符合逻辑的; 但是对比4.2的老版本 是在NettyRemotingClient.invokeAsync返回的,严格意义来说 老版本这样设计就已经不能称为严格意义上的异步了;
producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { countDownLatch.countDown(); System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { countDownLatch.countDown(); System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); 复制代码
最多发送一次 最少发送一次 精确发送一次
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway, TopicConfigSerializeWrapper topicConfigWrapper) { List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll( this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.getHAServerAddr(), topicConfigWrapper, this.filterServerManager.buildNewFilterServerList(), oneway, this.brokerConfig.getRegisterBrokerTimeoutMills(), this.brokerConfig.isCompressedRegister()); } 复制代码
nameServer将broker丢过来的数据搞成自己维护的一套
RocketMQ通过主从结构来实现消息冗余,master接收来自producer发送来的消息,然后同步消息到slave,根据master的role不同,同步的时机可分为两种不同的情况:
SYNC_MASTER和ASYNC_MASTER传输数据到salve的过程是一致的,只是时机上不一样。SYNC_MASTER接收到producer发送来的消息时候,会同步等待消息也传输到salve。
直观感受,IO操作性能是比较低的
基于ECS乞丐版linux环境测试 无cache写入和有cache
写入819M的操作,cache操作只需要7s 无cache操作就需要80s左右;更何况这是台烂机器的配置,好点的机器应该能相差个百倍;
所以如果按照第一个写入模型 如果写入1条耗时1ms => 1000/s 利用OS Cache的话 假设写入1条耗时0.01ms => 10w/s 这样一来每秒支持几十万已经初步实现
page cache了解链接
数据源:磁盘读到缓存(OS cache) cache copy -> 进程缓存 -> cache copy -> SOCKET
其实性能已经可以了 但是多了两步不必要的拷贝 那么如果没有这两步拷贝是不是就无敌了呢 答案:是
不难理解对Socket缓存仅仅就是拷贝数据的描述符过去,然后数据就直接从os cache中发送到网卡上去了,这个过程大大的提升了数据消费时读取文件数据的性能
若有缘下期见