tornado 源码自带了丰富的 demo ,这篇文章主要分析 demo 中的聊天室应用: chatdemo
首先看 chatdemo 的目录结构:
├── chatdemo.py ├── static │ ├── chat.css │ └── chat.js └── templates ├── index.html ├── message.html └── room.html
非常简单,基本没有分层,三个模版一个 js 一个 css ,还有一个最重要的 chatdemo.py
本文的重点是弄清楚 chatdemo.py 的运行流程,所以对于此项目的其他文件,包括模版及 chat.js 的实现都不会分析,只要知道 chat.js 的工作流程相信对于理解 chatdemo.py 没有任何问题
此 demo 主要基于长轮询。 获取新消息的原理:
在 chat.js 中有一个定时器会定时执行 update 操作
当没有新消息时 tornado 会一直 hold 住 chat.js 发来的 update 请求
当有新消息时 tornado 将包含新消息的数据返回给所有 hold 的 update 请求
此时 chat.js 收到 update 回复后更新返回数据在聊天室中,同时再进行一次 update 请求, 然后又从 1. 开始执行。
发送新消息的原理:
输入消息, 点击 post 按钮, chat.js 获取表单后用 ajax 方式发送请求 new
tornado 收到请求 new ,返回消息本身, 同时通知所有 hold 住的 update 请求 ( 这里也包括发送 new 请求的 chat.js 所发送的 update 请求 ) 返回新消息
所有在线的 chat.js 收到 update 请求回复,更新返回信息到聊天室,同时再进行一次 update 请求。
清楚了以上流程,我们直接来看 chatdemo.py :
def main(): parse_command_line() app = tornado.web.Application( [ (r"/", MainHandler), (r"/a/message/new", MessageNewHandler), (r"/a/message/updates", MessageUpdatesHandler), ], cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__", template_path=os.path.join(os.path.dirname(__file__), "templates"), static_path=os.path.join(os.path.dirname(__file__), "static"), xsrf_cookies=True, debug=options.debug, ) app.listen(options.port) tornado.ioloop.IOLoop.current().start() if __name__ == "__main__": main()
main 函数主要用作初始化应用、监听端口以及启动 tornado server 。我们看路由:
主页对应 MainHandler
new 请求对应 MessageNewHandler
updates 请求对应 MessageUpdatesHandler
下面来看 MainHandler :
# Making this a non-singleton is left as an exercise for the reader. global_message_buffer = MessageBuffer() class MainHandler(tornado.web.RequestHandler): def get(self): self.render("index.html", messages=global_message_buffer.cache)
只有一行代码,就是渲染并返回 index.html,渲染的附加信息就是 global_message_buffer 的所有缓存消息。 global_message_buffer 是 MessageBuffer 的一个实例。 我们先不关心 MessageBuffer 内部是什么,现在我们只要记住它主要是 用来储存聊天消息和连接到此聊天室的人的信息的类 。 其中 MessageBuffer().cache 就是保存聊天室所有聊天消息的结构。
然后来看 MessageNewHandler :
class MessageNewHandler(tornado.web.RequestHandler): def post(self): message = { "id": str(uuid.uuid4()), "body": self.get_argument("body"), } # to_basestring is necessary for Python 3's json encoder, # which doesn't accept byte strings. message["html"] = tornado.escape.to_basestring( self.render_string("message.html", message=message)) if self.get_argument("next", None): self.redirect(self.get_argument("next")) else: self.write(message) global_message_buffer.new_messages([message])
同样很简单,从 post 信息里获取发来的新消息 ( body ) ,然后给消息分配一个唯一的 uuid,接着把这段消息渲染成一段 html ,然后 self.write(message)
返回这段 html, 同时给 global_message_buffer ( MessageBuffer 的实例 ) 添加这条新信息。 这里其实我更倾向于返回 json 之类的数据,这样更加直观和规范,可能写 demo 的人考虑到读者对 json 之类的协议可能不熟悉故而选择了返回渲染好的 html 直接让 chat.js append 到 index.html 里。
接着来看 MessageUpdatesHandler :
class MessageUpdatesHandler(tornado.web.RequestHandler): @gen.coroutine def post(self): cursor = self.get_argument("cursor", None) # Save the future returned by wait_for_messages so we can cancel # it in wait_for_messages self.future = global_message_buffer.wait_for_messages(cursor=cursor) messages = yield self.future if self.request.connection.stream.closed(): return self.write(dict(messages=messages)) def on_connection_close(self): global_message_buffer.cancel_wait(self.future)
重点就在这里,可以看到其内部的 post 方法被 gen.coroutine 修饰器修饰,也就是说这个 post 方法现在是 协程 ( coroutine ) 方式工作。 对于协程比较陌生的童鞋,你可以直接把它当作是 单线程解决 io ( 网络请求 ) 密集运算被阻塞而导致低效率的解决方案 。 当然这样理解协程还比较笼统,之后我会详细写一篇关于协程的文章,但在这里这样理解是没有问题的。
现在来看代码内容,首先获取 cursor ,一个用来标识我们已经获取的消息的指针,这样 tornado 就不会把你已经获取的消息重复的发给你。 然后调用 global_message_buffer.wait_for_messages(cursor=cursor)
获取一个 future 对象。 future 对象是 tornado 实现的一个特殊的类的实例,它的作用就是 包含之后 ( 未来 ) 将会返回的数据 ,我们现在不用关心 Future() 内部如何实现,只要记住上面它的作用就行。 关于 Future 的解读我会放到阅读 Future 源码时讲。
然后看最关键的这句: messages = yield self.future
注意这里的 yield 就是 hold updates 请求的关键,它到这里相当于 暂停了整个 post 函数 ( updates 请求被 hold )同时也相当于 updates 这次网络请求被阻塞,这个时候协程发挥作用,把这个函数暂停的地方的所有信息保存挂起,然后把工作线程释放,这样 tornado 可以继续接受 new、 updates 等请求然后运行相应的方法处理请求。
当有新的消息返回时, tornado 底层的 ioloop 实例将会调用 gen.send(value) 返回新消息( value )给每个被暂停的方法的 yield 处, 此时协程依次恢复这些被暂停的方法, 同时用获得的返回消息继续执行方法, 这时 messages = yield self.future
继续执行,messages 获得 yield 的返回值 value ( python 中调用 gen.send(value) 将会把 value 值返回到 yield 处并替换原前 yield 后的值 ),然后判断下用户是否已经离开,如果还在线则返回新消息。
明白了以上流程,我们最后来看 MessageBuffer :
class MessageBuffer(object): def __init__(self): self.waiters = set() self.cache = [] self.cache_size = 200 def wait_for_messages(self, cursor=None): # Construct a Future to return to our caller. This allows # wait_for_messages to be yielded from a coroutine even though # it is not a coroutine itself. We will set the result of the # Future when results are available. result_future = Future() if cursor: new_count = 0 for msg in reversed(self.cache): if msg["id"] == cursor: break new_count += 1 if new_count: result_future.set_result(self.cache[-new_count:]) return result_future self.waiters.add(result_future) return result_future def cancel_wait(self, future): self.waiters.remove(future) # Set an empty result to unblock any coroutines waiting. future.set_result([]) def new_messages(self, messages): logging.info("Sending new message to %r listeners", len(self.waiters)) for future in self.waiters: future.set_result(messages) self.waiters = set() self.cache.extend(messages) if len(self.cache) > self.cache_size: self.cache = self.cache[-self.cache_size:]
初始化方法中 self.waiters 就是一个等待新消息的 listener 集合 ( 直接理解成所有被 hold 住的 updates 请求队列可能更清晰 )
self.cache 就是储存所有聊天消息的列表, self.cache_size = 200
则定义了 cache 的大小 是存 200 条消息。
然后先来看简单的 new_messages:
遍历 waiters 列表,然后给所有的等待者返回新消息,同时清空等待者队列。 然后把消息加到缓存 cache 里,如果缓存大于限制则取最新的 200 条消息。这里只要注意到 future.set_result(messages)
就是用来给 future 对象添加返回数据 ( 之前被 yield 暂停的地方此时因为 set_result() 方法将会获得 "未来" 的数据 ) 这一点即可。
然后来看 wait_for_messages :
def wait_for_messages(self, cursor=None): # Construct a Future to return to our caller. This allows # wait_for_messages to be yielded from a coroutine even though # it is not a coroutine itself. We will set the result of the # Future when results are available. result_future = Future() if cursor: new_count = 0 for msg in reversed(self.cache): if msg["id"] == cursor: break new_count += 1 if new_count: result_future.set_result(self.cache[-new_count:]) return result_future self.waiters.add(result_future) return result_future
首先初始化一个 Future 对象,然后根据 cursor 判断哪些消息已经获取了哪些还没获取,如果缓存中有对于这个 waiter 还没获取过的消息,则直接调用 set_result() 返回这些缓存中已有的但对于这个 waiter 来说是新的的数据。 如果这个 waiter 已经有缓存中的所有数据,那么就把它加到等待者队列里保持等待,直到有新消息来时调用 new_messages 再返回。
而最后一个 cancel_wait 就很简单了,当有用户退出聊天室时,直接从 self.waiters 中移除他所对应的等待者。
当明白了整个代码的运行流程后,我们可以基于这个简单的 demo 而写出更加丰富的例子,比如加入 session ,做登陆、做好友关系,做单聊做群聊等等。
chatdemo with room 是我添加的一个简单功能,输入聊天室房号后再进行聊天,只有同一房间中的人才能收到彼此的消息。
以上就是鄙人对整个 chatdemo.py 的解读。 在阅读此 demo 时,我没有参考其他源码解读,只是通过阅读 tornado 底层的源码而得出的个人的理解,因此肯定会有很多理解不成熟甚至错误的地方,还望大家多多指教。
原文地址
作者: rapospectre