由于在提高 CPU 频率上遇到的瓶颈,当前多核架构已经成为提高 CPU 计算能力的主要方式,并且在可以预见的时间内,一台计算机尤其是大型机可以拥有的核数乃至 CPU 数仍将不断提升。
然而传统的软件技术在处理并发问题能力上的滞后性却使得多核硬件技术无法充分发挥其优势。传统的软件开发语言通常使用线程来处理并发问题,这种做法存在一些固有的弊端,比如:
即便我们的开发人员有足够的能力来处理好上述问题,我们也要为此付出额外的代价:
本文介绍了如何运用开源的 ZeroMQ 来解决上述问题,并且详述了 ZeroMQ 在主流云技术 Openstack 中的运用,给云计算开发人员解决此类问题提供思路和参考。
理想化的并发应用必须能够适应任何数量的 CPU 核数,杜绝锁的使用,在多数情况下,其编程实践应该与单线程开发一致而无需额外的繁琐同步处理。Erlang 语言为我们提供一个范例,它具备如下优点:
(注:这里的进程与操作系统进程并无直接对应关系)
这是一种与多线程机制截然不同的并发策略,其关键在于通过消息传递 (Messaging) 来实现进程间的通信而非共享内存数据。进程首先将消息发送给消息队列,后者再将消息传递给目标进程,这一过程无需使用锁来进行同步。
然而 Erlang 的受众仍十分有限,需要有一种受众更广,更易于使用的技术,来最大限度的发挥多核硬件的能力。这正是创立 ZeroMQ 的初衷,它在继承了 Erlang 优秀的并发特性的基础上,更具备如下优势:
ZeroMQ(也拼写作ØMQ,0MQ 或 ZMQ) 是一个为可伸缩的 分布式 或并发应用程序设计的高性能异步消息库。它提供一个 消息队列 , 但是与 面向消息的中间件 不同,ZeroMQ 的运行不需要专门的 消息代理 ( message broker )。该库设计成常见的 套接字 风格的 API 。ZeroMQ 是由 iMatix 公司和大量贡献者组成的社群共同开发的。ZeroQ 通过许多第三方软件支持大部分流行的编程语言,从 Java 和 Python 到 Erlang 和 Haskell 。
上一节中我们提到,走出传统并发策略困境的关键在于,以消息传递取代内存数据共享来进行进程(线程)间的通信。而 ZeroMQ 正是通过为传统的 Socket 接口赋予消息队列的能力来实现这一目标。
Request-reply 是 ZeroMQ 提供的最常用的消息传递模式之一。在这种模式下,客户端进程发起请求,服务器端进程接受请求并返回响应给客户端。客户端和服务器端进程都可以有多个。
清单 1 和清单 2 实现了一个简单的“请求-应答”应用的服务器端和客户端。在 HelloWorldServer.py 中,我们首先创建了一个 socket 对象,将它绑定到一个特定的地址。一旦接受到客户端的请求,就发送内容为”World”的回复。
import zmq import time context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") while True: # 等待下一个来自客户端的请求 message = socket.recv() print "Received request: ", message # 休眠时间 time.sleep (1) # Do some 'work' # 发送回复给客户端 socket.send("World")
import zmq context = zmq.Context() # 发送给服务器端的 Socket print "Connecting to hello world server..." socket = context.socket(zmq.REQ) socket.connect ("tcp://localhost:5555") # 发送 10 个请求,每次都等待相应 for request in range (10): print "Sending request ", request,"..." socket.send ("Hello") # 收到回复 message = socket.recv() print "Received reply ", request, "[", message, "]"
从表面上看这种风格与传统的 socket 十分相似,但实际上它们有重大的差别。首先,ZeroMQ 的 socket 是面向消息的,我们从 socket 里直接获得消息字符串,而非字节流,发送亦然。其次,开发者无需关心负责底层通讯的连接的管理,这种连接可能是传统的 socket 连接,也可能基于其他协议。这些底层连接的创建,销毁,重连以及它如何确保消息被有效的发送,都由 ZeroMQ 负责管理。最后,ZeroMQ 的 socket 之间的连接不受任何限制,而传统的 socket 之间往往无法建立多对多的连接。因此,ZeroMQ 的 socket 可以被看作一个功能完善的消息队列。
REQ 类型的 socket 通常被用来发送请求,并且只有在收到第一个请求的回复之后,才能发送第二个请求。在 HelloWorldClient.py 中该 REQ 类型的 socket 只连接到了一个地址,但它也可以连接多个地址。在这种情况下,ZeroMQ 将确保消息被均匀的发送给每个地址,但每次只有一个地址会受到请求。REP 类型的 socket 用于接受请求。它必须在发送第一个请求的回复之后才能接受第二个请求。尚未来得及处理的请求按顺序被置于队列中。
REQ 和 REP 类型的 socket 在消息发送和接受的操作序列上存在严格限制,为了应对更复杂的情况,ZeroMQ 也提供了更为灵活的 socket 类型,这就是 DEALER 和 ROUTER。
DEALER 和 REQ 的区别在于,它可以按照任意的次序执行发送消息和接受消息的操作,而不必等待上一个请求的回复。同样,ROUTER 也不必等待发送上一次请求的响应完成就能接受第二个请求。此外,ROUTER 会为请求加上标识以记录最初请求者的身份。这样一来它可以将该请求发送给其他进程处理,得到返回结果后,仍可以根据消息中的身份标识将该请求准确的返回给最初请求者。因此 ROUTER 和 DEALER 可以被用来实现类似于传统消息队列架构中的消息服务器的进程。
Publish-subscribe 是用于广播消息的模式,在这种模式下发布的消息将同时发送给多个节点。它包含 PUB 和 SUB 两种 socket 类型。与 Request-reply 不同,PUB 和 SUB 都只能进行单向的消息传递。PUB 只能发送消息,而 SUB 只能接受消息。
清单 3,清单 4 是一个简单的 Publish-subscribe 模式的实现。从中我们可以看到,作为消息订阅者的 syncsub.py,将一个 SUB 类型 socket 绑定到‘tcp://localhost:5561’,这代表了一个单一的地址。而作为消息发布者的 syncpub.py,将一个 PUB 类型的 socket 绑定到‘tcp://*:5561’,这实际上匹配了多个地址。也就是说,凡是绑定到符合格式‘tcp://*:5561’地址的任何 SUB 类型的 socket 都可以接收到该 PUB 进程发布的消息。
def main(): context = zmq.Context() # 首先,连接我们的订阅 socket subscriber = context.socket(zmq.SUB) subscriber.connect('tcp://localhost:5561') subscriber.setsockopt(zmq.SUBSCRIBE, "") # 其次,跟 publisher 同步 syncclient = context.socket(zmq.REQ) syncclient.connect('tcp://localhost:5562') # 发送一个同步请求 syncclient.send('') # 等待同步的回复 syncclient.recv() # 第三,收到更新并且报告我们收到的数目 nbr = 0 while True: msg = subscriber.recv() if msg == 'END': break nbr += 1 print 'Received %d updates' % nbr if __name__ == '__main__': main()
import zmq # 我们等待 10 个 subscribers SUBSCRIBERS_EXPECTED = 10 def main(): context = zmq.Context() # 发送给客户端的 socket publisher = context.socket(zmq.PUB) publisher.bind('tcp://*:5561') # 收到信号的 socket syncservice = context.socket(zmq.REP) syncservice.bind('tcp://*:5562') # 从订阅者收到同步 subscribers = 0 while subscribers < SUBSCRIBERS_EXPECTED: # 等待同步请求 msg = syncservice.recv() # send synchronization reply syncservice.send('') subscribers += 1 print "+1 subscriber" # 广播 1M 的更新后结束 for i in range(1000000): publisher.send('Rhubarb'); publisher.send('END') if __name__ == '__main__': main()
在这一实现中,我们还使用 REQ 和 REP 类型的 socket 对 SUB 和 PUB 进程进行了同步,仅当出现 10 个 SUB 进程时,PUB 进程才会开始发送消息。
Pipeline 模式通常用于实现工作流的概念,每个进程负责整个处理流程中的一个步骤。每个步骤接受上一步的处理结果,并将自己的处理结果传递给下一步。每一步可以有多个备选进程。它包含 PUSH 和 PULL 两种 socket 类型。与 PUB 和 SUB 类型的 socket 类似,这两种 socket 都只能做单向消息传递,PUSH 只能发送消息,PULL 只能接受消息。因此通常一个进程需要同时包含这两种类型的 socket。此外,与 PUB 不同的是,PUSH 只会将消息发送给单个 PULL 节点。
这种模式仅适用于两个特定节点之间互相传递消息的场合。
在上一节中我们简要介绍了 ZeroMQ 针对各种不同应用场合提供的各种消息传递模型。然而消息队列技术发展至今,有许多成功的产品,它们同样提供了这些模型。那么 ZeroMQ 与这些传统的消息队列技术相比有什么独特的优势呢?可以说,其中最大的区别就是 ZeroMQ 弱化了消息中间件的概念。
在传统的消息队列系统中,消息中间件扮演着核心的角色。进程间并不是直接交互,而是连接到消息中间件,由消息中间件确保消息的有效传递。这种模式的优点是:
尽管如此,消息中间件的缺点也同样明显:
不难看出,不管是使用单一的消息中间件或者是完全不使用消息中间件,都不是完美的解决方案。在实际应用,往往需要采用一些折中的解决方式,比如:
由于 ZeroMQ 本身并不依赖于消息中间件,因此开发者可以根据实际情况来选择合适的消息传递模型。而传统的消息队列技术因为过度依赖现有的消息中间件产品,难以提供这种灵活性。
Openstack 作为一个开源的 IaaS 平台为人们所熟知,并在近年来伴随着云计算的兴起而成为热点。Nova 是 Openstack 中最为核心的组件,负责完成与虚机相关的各种操作。Nova 采用了多进程的架构,通过消息传递来完成各进程间的相互协作,并且提供了一个基于 ZeroMQ 的实现。
在 Nova 的诸多进程中,nova-api 负责提供统一的对外接口,nova-scheduler 负责为虚机选择合适的物理机宿主,nova-compute 负责虚机的创建和启停等操作,nova-network 和 nova-volume 分别负责虚机的网卡和存储的相关操作。这些进程之间完全依赖消息传递来进行通信。
以图 3 中的创建新虚机操作为例,首先用户将新虚机的规格参数提供给 nova-api。nova-api 将用户提供的参数组装成一个创建虚机的请求,以消息的形式发送给 nova-scheduler。nova-scheduler 根据相关策略为新的虚机选择一个合适的物理机宿主,然后发送消息给 nova-compute,要求在该物理机上按要求创建一个新的虚机。当所有操作都完成后,nova-scheduler 再将所创建虚机的信息封装成消息发送给 nova-api,最终返回给用户。
为了实现上一节中所描述的架构,Nova 利用 ZeroMQ 建立了一套消息传递机制,提供以下操作:
结合这些操作,图 3 可以重新描述如下,如图 5 所示。值得一提的是,nova-scheduler 和 nova-compute 之间的 CAST 和 NOTIFY 是两个异步的操作,即 nova-scheduler 通过 cast 操作发出部署请求后,就已经发送响应给 nova-api,此时虚机处于部署中的状态。当部署完成时,nova-compute 另行通过 NOTIFY 发送通知给 nova-scheduler,将虚机状态设置为部署完毕。此外,在创建虚机的过程中,nova-scheduler 也会发送相应的事件给其他进程,以便进行协调。
通过以上各章节,我们了解了 ZeroMQ 的来历、基本功能以及在开源 IaaS 框架 Openstack 中的应用。总而言之,ZeroMQ 为开发者提供了简单易用、功能完善的消息通讯机制,能够很好的解决多进程并发应用中各进程间通信协作的问题。同时,ZeroMQ 不依赖于消息中间件,这为开发者提供了足够的灵活性,根据实际问题的需要来构建适应性更强的应用。