转载

RocketMQ初探一:NameServer的作用

第一次真正接触 Java 消息服务是在 2013 年底,当时是给中国移动做统一支付平台,当时用的就是著名的 Apache ActiveMQ ,当时觉得很有趣,一个服务队列竟然可以玩出这么多花样来。当时为了尽快的入门,还把《 Java Message Service 》给看了一遍,这对于初学者的我收获颇多。我说知道的完全实现 JMS 规范的 MOM ActiveMQ/Apollo HornetQ ,都是采用 Java 实现。 JMS 中说明了 Java 消息服务的两种消息传送模型,即 P2P (点对点)和 Pub/Sub (发布订阅),在约定了一些消息服务特性的同时,并提供了一套接口 API ,是否实现了该 API ,标志着 MOM 是否支持 JMS 规范, JMS 规范中定义了消息服务诸多特性,这些特性和他所面对的企业级服务场景相关,当然,这也严重限制了消息服务的吞吐量,完全实现 JMS 规范的 MOM ,性能总不会太高,而且 JMS 规范中没有涉及消息服务的分布式特性,导致大多数实现 JMS 规范的 MOM 分布式部署功能比较弱,只适合集群部署。

说到高性能消息中间件,第一个想到的肯定是 LinkedIn 开源的 Kafka ,虽然最初 Kafka 是为日志传输而生,但也非常适合互联网公司消息服务的应用场景,他们不要求数据实时的强一致性(事务),更多是希望达到数据的最终一致性。 RocketMQ MetaQ 3.0 版本,而 MetaQ 最初的设计又参考了 Kafka 。最初的 MetaQ 1.x 版本由阿里的原作者庄晓丹开发,后面的 MetaQ 2.x 版本才进行了开源,这里需要注意一点的事, MetaQ 1.x MetaQ 2.x 是依赖 ZooKeeper 的,但 RocketMQ (即 MetaQ 3.x )却去掉了 ZooKeeper 依赖,转而采用自己的 NameServer

ZooKeeper 是著名的分布式协作框架,提供了 Master 选举、分布式锁、数据的发布和订阅等诸多功能,为什么 RocketMQ 没有选择 ZooKeeper ,而是自己开发了 NameServer ,我们来具体看看 NameServer RocketMQ 集群中的作用就明了了。

RocketMQ Broker 有三种集群部署方式: 1. 单台 Master 部署; 2. 多台 Master 部署; 3. Master Slave 部署;采用第 3 种部署方式时, Master Slave 可以采用同步复制和异步复制两种方式。下图是第 3 种部署方式的简单图:

RocketMQ初探一:NameServer的作用

图虽然是网上找的,但也足以说明问题,当采用多 Master 方式时, Master Master 之间是不需要知道彼此的,这样的设计直接降低了 Broker 实现的复查性,你可以试想,如果 Master Master 之间需要知道彼此的存在,这会需要在 Master 之中维护一个网络的 Master 列表,而且必然设计到 Master 发现和活跃 Master 数量变更等诸多状态更新问题,所以最简单也最可靠的做法就是 Master 只做好自己的事情(比如和 Slave 进行数据同步)即可,这样,在分布式环境中,某台 Master 宕机或上线,不会对其他 Master 造成任何影响。

那么怎么才能知道网络中有多少台 Master Slave 呢?你会很自然想到用 ZooKeeper ,每个活跃的 Master Slave 都去约定的 ZooKeeper 节点下注册一个状态节点,但 RocketMQ 没有使用 ZooKeeper ,所以这件事就交给了 NameServer 来做了(看上图)。

结论一: NameServer 用来保存活跃的 broker 列表,包括 Master Slave

当然,这个结论百度一查就知道,我们移步到 rocketmq-namesrv 模块中最重要的一个类: RouteInfoManager ,它的主要属性如下:

private final ReadWriteLock lock = new ReentrantReadWriteLock();

private final HashMap<String /* topic */ , List<QueueData>> topicQueueTable ;

private final HashMap<String /* brokerName */ , BrokerData> brokerAddrTable ;

private final HashMap<String /* clusterName */ , Set<String /* brokerName */ >> clusterAddrTable ;

private final HashMap<String /* brokerAddr */ , BrokerLiveInfo> brokerLiveTable ;

private final HashMap<String /* brokerAddr */ , List<String> /* Filter Server */ > filterServerTable ;

每个属性通过名字就能清楚的知道是什么意思,之所以能用非线程安全的 HashMap ,是因为有读写锁 lock 来对 HashMap 的修改做保护。我们注意到保存 broker Map 有两个,即 brokerAddrTable 用来保存所有的 broker 列表和 brokerLiveTable 用来保存当前活跃的 broker 列表,而 BrokerData 用来保存 broker 的主要新增,而 BrokerLiveInfo 只用来保存上次更新(心跳)时间,我们可以直接看看 RouteInfoManager 中扫描非活跃 broker 的方法:

// Broker Channel 两分钟过期

private final static long BrokerChannelExpiredTime = 1000 * 60 * 2 ;

public void scanNotActiveBroker() {

Iterator<Entry<String, BrokerLiveInfo>> it = this . brokerLiveTable .entrySet().iterator();

while (it.hasNext()) {

Entry<String, BrokerLiveInfo> next = it.next();

long last = next.getValue().getLastUpdateTimestamp();

if ((last + BrokerChannelExpiredTime ) < System. currentTimeMillis ()) {

RemotingUtil. closeChannel (next.getValue().getChannel());

it.remove();

log .warn( "The broker channel expired, {} {}ms" , next.getKey(), BrokerChannelExpiredTime );

this .onChannelDestroy(next.getKey(), next.getValue().getChannel());

}

}

}

可以看出,如果 两分钟 内都没收到一个 broker 的心跳数据,则直接将其从 brokerLiveTable 中移除,注意,这还会导致该 broker brokerAddrTable 被删除,当然,如果该 broker Master ,则它的所有 Slave broker 都将被删除。 具体细节可以参看 RouteInfoManager onChannelDestroy 方法。

结论二: NameServer 用来保存所有 topic 和该 topic 所有队列的列表。

我们注意到, topicQueueTable value QueueData List ,我们看看 QueueData 中的属性:

private String brokerName ;  // broker 的名称

private int readQueueNums ;  // 读队列数量

private int writeQueueNums ; // 写队列数量

private int perm ;           // 读写权限

private int topicSynFlag ;   // 同步复制还是异步复制标记

所以,你几乎可以在 NameServer 这里知道 topic 相关的所有信息,包括 topic 有哪些队列,这些队列在那些 broker 上等。

结论三: NameServer 用来保存所有 broker Filter 列表。

关于这一点,讨论 broker 的时候再细说。

DefaultRequestProcessor NameServer 的默认请求处理器,他处理了定义在 rocketmq-common 模块中 RequestCode 定义的部分请求,比如注册 broker 、注销 broker 、获取 topic 路由、删除 topic 、获取 broker topic 权限、获取 NameServer 的所有 topic 等。

在源代码中, NettyServerConfig 类记录 NameServer 中的一些默认参数,比如端口、服务端线程数等,列出如下:

private int listenPort = 8888 ;

private int serverWorkerThreads = 8 ;

private int serverCallbackExecutorThreads = 0 ;

private int serverSelectorThreads = 3 ;

private int serverOnewaySemaphoreValue = 256 ;

private int serverAsyncSemaphoreValue = 64 ;

private int serverChannelMaxIdleTimeSeconds = 120 ;

这些都可以通过启动时指定配置文件来进行覆盖修改,具体可以参考 NameServer 的启动类 N amesrvStartup 的实现(没想到 Apache 还有对命令行提供支持的 commons-cls 的包)。

现在我们再回过头来看看 RocketMQ 为什么不使用 ZooKeeper ZooKeeper 可以提供 Master 选举功能,比如 Kafka 用来给每个分区选一个 broker 作为 leader ,但对于 RocketMQ 来说, topic 的数据在每个 Master 上是对等的,没有哪个 Master 上有 topic 上的全部数据,所以这里选举 leader 没有意义; RockeqMQ 集群中,需要有构件来处理一些通用数据,比如 broker 列表, broker 刷新时间,虽然 ZooKeeper 也能存放数据,并有一致性保证,但处理数据之间的一些逻辑关系却比较麻烦,而且数据的逻辑解析操作得交给 ZooKeeper 客户端来做,如果有多种角色的客户端存在,自己解析多级数据确实是个麻烦事情;既然 RocketMQ 集群中没有用到 ZooKeeper 的一些重量级的功能,只是使用 ZooKeeper 的数据一致性和发布订阅的话,与其依赖重量级的 ZooKeeper ,还不如写个轻量级的 NameServer NameServer 也可以集群部署,只有一千多行代码的 NameServer 稳定性肯定高于 ZooKeeper ,占用的系统资源也可以忽略不计,何乐而不为?当然,这些只是本人的一点理解,具体原因当然得 RocketMQ 设计和开发者来说。

原文  http://www.iteye.com/news/31789
正文到此结束
Loading...