转载

实践,用tornado实现自定义协议server

前言

俗话说"光说不练假把式",上一篇文里都只是光看着别人的源码说,貌似有点纸上谈兵的意思.

所以这次写一个简单的,自己定义协议的 server .

既可以熟悉 Future和coroutine 的用法,又可以在去除了复杂的http协议后,了解 tornado 的工作原理.

代码不多,加上空行和import也就200行不到.

在github上的源码点这里

目标

  • 1.定义一个简单的协议,达到远程调用的效果,并且是个长连接的协议(类似 websock )
  • 2.模仿 tornado 的框架模式开发这个 server 框架,让用户代码开发方便,并且支持 coroutine
  • 3.为了省去客户端的开发,客户端使用 telnet

协议

  • 1.客户端连接成后,以换行符分割每次通信内容
  • 2.第一次通信内容是需要执行 handler 名称,第二次通信的内容是该 handler 的方法名
  • 3.对于客户端的主动 close ,需等待此连接所有的异步操作完成后才关闭连接
    最后运行方式如下图:

实践,用tornado实现自定义协议server

源码说明

总览

因为想让代码尽量少,所以委托模式没有严格按照设计模式的规范写,直接忽略掉了 interface 的定义.严格来说是需要定义 interface 和判断传入参数的类型的(泛华)

这是类的实例关系图(也不知道是不是这样画...)

实践,用tornado实现自定义协议server

MyServer和MyApplication的实例常驻.一个连接进来后就会创建图中其他的实例各一个.

异步说明

  • 1.为了达到目标中的第一点,需要一个while循环,读取了客户端数据后,执行 handler ,

    立即继续读取下一条客户端数据.直到客户端关闭操作,引发 StreamClosedError 才退出循环

  • 2.为了达到目标中的第二点,判断handler的返回值,如果类型是Future则yield处理,因为本方法有 @gen.coroutine ,所以 yield 就代表异步操作是在 gen.Runner 中执行的.

  • 3.为了达到目标中的第三点,需要记录每一个异步操作,并且异步操作完成后移除.当客户端主动关闭连接时,需判断是否还有 future 未完成.所以close代码中给每个 future 加上 done_callback ,用以检查关闭

详情见代码 MyServerConnection._server_request_loop

@gen.coroutine  def _server_request_loop(self, delegate):   try:    #get request adepter    request_delegate = delegate.on_request(self)    while True:     try:      message_future = self.stream.read_until_regex(b"/n/r?")      message = yield message_future      message = self._parse_data(message)     except (iostream.StreamClosedError,        iostream.UnsatisfiableReadError):      app_log.error(' close the connect')      self.close()      return     except Exception:      gen_log.error("Uncaught exception", exc_info=True)      self.close()      return     ret = request_delegate.on_message(message)     #如果是异步执行的方法,保存future,用于确保close时,所有future都已完成     if isinstance(ret, Future):      ret.add_done_callback(lambda f:self._serving_futures.remove(f))      self._serving_futures.append(ret)   finally:    delegate.on_close(self)  
 def close(self):   def mayby_close(f):    futures = self._serving_futures+self._pending_writes    app_log.error(futures)    if not any(futures):     self.stream.close()   pending_futrues = self._serving_futures+self._pending_writes   if any(pending_futrues):    map(lambda f:f.add_done_callback(mayby_close),pending_futrues)   else:    self.stream.close()  

关于@coroutine

其实用 @coroutine 的时候只需要记住几点就行了

* 1.被包装的函数(方法),返回值是 Future ,

* 2.被包装的函数走完最后一行代码后,返回的 Futurecallback 就会被运行(因为在 Runner 中引发了 StopIteration 错误,被 set_result 了)

* 3.被包装的函数是在 gen.Runner 中运行的,而 Runner 是在 ioloop (callback那块)中运行的

总结

代码非常简单,因为 tornado 为我们提供了异步的库( tornado 真强大,协程好厉害!!),并且是单进程的编程,不需要考虑锁,写起来就更轻松了.

最后附上程序效果图

实践,用tornado实现自定义协议server

废话

这只是个吃饱撑着的程序,一点实际作用都没啊(好想被拍死!).吃饱撑着的原因是我还没下决心去找工作...工作太难找啦(哭~~)!!!!好想被带走.................

正文到此结束
Loading...