第一次真正接触 Java 消息服务是在 2013 年底,当时是给中国移动做统一支付平台,当时用的就是著名的 Apache ActiveMQ ,当时觉得很有趣,一个服务队列竟然可以玩出这么多花样来。当时为了尽快的入门,还把《 Java Message Service 》给看了一遍,这对于初学者的我收获颇多。我说知道的完全实现 JMS 规范的 MOM 有 ActiveMQ/Apollo 和 HornetQ ,都是采用 Java 实现。 JMS 中说明了 Java 消息服务的两种消息传送模型,即 P2P (点对点)和 Pub/Sub (发布订阅),在约定了一些消息服务特性的同时,并提供了一套接口 API ,是否实现了该 API ,标志着 MOM 是否支持 JMS 规范, JMS 规范中定义了消息服务诸多特性,这些特性和他所面对的企业级服务场景相关,当然,这也严重限制了消息服务的吞吐量,完全实现 JMS 规范的 MOM ,性能总不会太高,而且 JMS 规范中没有涉及消息服务的分布式特性,导致大多数实现 JMS 规范的 MOM 分布式部署功能比较弱,只适合集群部署。
说到高性能消息中间件,第一个想到的肯定是 LinkedIn 开源的 Kafka ,虽然最初 Kafka 是为日志传输而生,但也非常适合互联网公司消息服务的应用场景,他们不要求数据实时的强一致性(事务),更多是希望达到数据的最终一致性。 RocketMQ 是 MetaQ 的 3.0 版本,而 MetaQ 最初的设计又参考了 Kafka 。最初的 MetaQ 1.x 版本由阿里的原作者庄晓丹开发,后面的 MetaQ 2.x 版本才进行了开源,这里需要注意一点的事, MetaQ 1.x 和 MetaQ 2.x 是依赖 ZooKeeper 的,但 RocketMQ (即 MetaQ 3.x )却去掉了 ZooKeeper 依赖,转而采用自己的 NameServer 。
ZooKeeper 是著名的分布式协作框架,提供了 Master 选举、分布式锁、数据的发布和订阅等诸多功能,为什么 RocketMQ 没有选择 ZooKeeper ,而是自己开发了 NameServer ,我们来具体看看 NameServer 在 RocketMQ 集群中的作用就明了了。
RocketMQ 的 Broker 有三种集群部署方式: 1. 单台 Master 部署; 2. 多台 Master 部署; 3. 多 Master 多 Slave 部署;采用第 3 种部署方式时, Master 和 Slave 可以采用同步复制和异步复制两种方式。下图是第 3 种部署方式的简单图:
图虽然是网上找的,但也足以说明问题,当采用多 Master 方式时, Master 与 Master 之间是不需要知道彼此的,这样的设计直接降低了 Broker 实现的复查性,你可以试想,如果 Master 与 Master 之间需要知道彼此的存在,这会需要在 Master 之中维护一个网络的 Master 列表,而且必然设计到 Master 发现和活跃 Master 数量变更等诸多状态更新问题,所以最简单也最可靠的做法就是 Master 只做好自己的事情(比如和 Slave 进行数据同步)即可,这样,在分布式环境中,某台 Master 宕机或上线,不会对其他 Master 造成任何影响。
那么怎么才能知道网络中有多少台 Master 和 Slave 呢?你会很自然想到用 ZooKeeper ,每个活跃的 Master 或 Slave 都去约定的 ZooKeeper 节点下注册一个状态节点,但 RocketMQ 没有使用 ZooKeeper ,所以这件事就交给了 NameServer 来做了(看上图)。
结论一: NameServer 用来保存活跃的 broker 列表,包括 Master 和 Slave 。
当然,这个结论百度一查就知道,我们移步到 rocketmq-namesrv 模块中最重要的一个类: RouteInfoManager ,它的主要属性如下:
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String /* topic */ , List<QueueData>> topicQueueTable ;
private final HashMap<String /* brokerName */ , BrokerData> brokerAddrTable ;
private final HashMap<String /* clusterName */ , Set<String /* brokerName */ >> clusterAddrTable ;
private final HashMap<String /* brokerAddr */ , BrokerLiveInfo> brokerLiveTable ;
private final HashMap<String /* brokerAddr */ , List<String> /* Filter Server */ > filterServerTable ;
每个属性通过名字就能清楚的知道是什么意思,之所以能用非线程安全的 HashMap ,是因为有读写锁 lock 来对 HashMap 的修改做保护。我们注意到保存 broker 的 Map 有两个,即 brokerAddrTable 用来保存所有的 broker 列表和 brokerLiveTable 用来保存当前活跃的 broker 列表,而 BrokerData 用来保存 broker 的主要新增,而 BrokerLiveInfo 只用来保存上次更新(心跳)时间,我们可以直接看看 RouteInfoManager 中扫描非活跃 broker 的方法:
// Broker Channel 两分钟过期
private final static long BrokerChannelExpiredTime = 1000 * 60 * 2 ;
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this . brokerLiveTable .entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BrokerChannelExpiredTime ) < System. currentTimeMillis ()) {
RemotingUtil. closeChannel (next.getValue().getChannel());
it.remove();
log .warn( "The broker channel expired, {} {}ms" , next.getKey(), BrokerChannelExpiredTime );
this .onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
可以看出,如果 两分钟 内都没收到一个 broker 的心跳数据,则直接将其从 brokerLiveTable 中移除,注意,这还会导致该 broker 从 brokerAddrTable 被删除,当然,如果该 broker 是 Master ,则它的所有 Slave 的 broker 都将被删除。 具体细节可以参看 RouteInfoManager 的 onChannelDestroy 方法。
结论二: NameServer 用来保存所有 topic 和该 topic 所有队列的列表。
我们注意到, topicQueueTable 的 value 是 QueueData 的 List ,我们看看 QueueData 中的属性:
private String brokerName ; // broker 的名称
private int readQueueNums ; // 读队列数量
private int writeQueueNums ; // 写队列数量
private int perm ; // 读写权限
private int topicSynFlag ; // 同步复制还是异步复制标记
所以,你几乎可以在 NameServer 这里知道 topic 相关的所有信息,包括 topic 有哪些队列,这些队列在那些 broker 上等。
结论三: NameServer 用来保存所有 broker 的 Filter 列表。
关于这一点,讨论 broker 的时候再细说。
DefaultRequestProcessor 是 NameServer 的默认请求处理器,他处理了定义在 rocketmq-common 模块中 RequestCode 定义的部分请求,比如注册 broker 、注销 broker 、获取 topic 路由、删除 topic 、获取 broker 的 topic 权限、获取 NameServer 的所有 topic 等。
在源代码中, NettyServerConfig 类记录 NameServer 中的一些默认参数,比如端口、服务端线程数等,列出如下:
private int listenPort = 8888 ;
private int serverWorkerThreads = 8 ;
private int serverCallbackExecutorThreads = 0 ;
private int serverSelectorThreads = 3 ;
private int serverOnewaySemaphoreValue = 256 ;
private int serverAsyncSemaphoreValue = 64 ;
private int serverChannelMaxIdleTimeSeconds = 120 ;
这些都可以通过启动时指定配置文件来进行覆盖修改,具体可以参考 NameServer 的启动类 N amesrvStartup 的实现(没想到 Apache 还有对命令行提供支持的 commons-cls 的包)。
现在我们再回过头来看看 RocketMQ 为什么不使用 ZooKeeper ? ZooKeeper 可以提供 Master 选举功能,比如 Kafka 用来给每个分区选一个 broker 作为 leader ,但对于 RocketMQ 来说, topic 的数据在每个 Master 上是对等的,没有哪个 Master 上有 topic 上的全部数据,所以这里选举 leader 没有意义; RockeqMQ 集群中,需要有构件来处理一些通用数据,比如 broker 列表, broker 刷新时间,虽然 ZooKeeper 也能存放数据,并有一致性保证,但处理数据之间的一些逻辑关系却比较麻烦,而且数据的逻辑解析操作得交给 ZooKeeper 客户端来做,如果有多种角色的客户端存在,自己解析多级数据确实是个麻烦事情;既然 RocketMQ 集群中没有用到 ZooKeeper 的一些重量级的功能,只是使用 ZooKeeper 的数据一致性和发布订阅的话,与其依赖重量级的 ZooKeeper ,还不如写个轻量级的 NameServer , NameServer 也可以集群部署,只有一千多行代码的 NameServer 稳定性肯定高于 ZooKeeper ,占用的系统资源也可以忽略不计,何乐而不为?当然,这些只是本人的一点理解,具体原因当然得 RocketMQ 设计和开发者来说。
原文 http://www.iteye.com/news/31789