转载

知乎live全文搜索之构建基于asyncio+sanic的RESTful API服务

今天的文章图片就是这个知乎Live全文搜索的效果了。如果在wifi情况下或者土豪不介意流量的同学可以直接 感受实际使用的动态效果 。

在我的《Python Web开发实战》一书中,比较少的介绍到豆瓣自己造的轮子。有读者喷「不够实战」。我显然承认,也很苦闷 ,今天顺便来吐个槽。我刚工作的时候特别愿意混各种技术会议和活动。经常有一些专家在上面讲:我们自己实现了一个XXX,它有如下多的特性,现在支持了多少个产品线的多少个应用,每天的数据量YY PB,流量ZZ…

通篇在讲架构,摆几张高大上的图,甚至能说几个大家不了解的新的玩法都很少,还不断的问坐在下面的领导或者法务:额,这个我能说嘛;那个我能分享么?最最重要的是,他们讲的这些东西大多不是开源的…. 也基本没有一个可以拿得出手的论文,甚至说白了,如果你正好专注这一部分,会发现它也是根据了FLAG公司的论文在造轮子罢了,说不定造的还不如你。

听过之后,也没有收获,都是「别人家」的,甚至有种炫耀的感觉而已,至于有什么苦可能只有他们自己知道吧。我就越来越不愿意参加这种会议了。

写书或者博客也是这样。一切东西脱离了公司能提供的基础设施和环境都是空谈,但是这些铺垫说起来就太大了,先不讨论公司有没有授权你在外面说,就是没开源这点就不好弄。说的人都是在虚化的讲一大坨的东西,最多来几个截图之类的(应该还打了马赛克)。等这一大坨东西说的让大家明白了,一本书的厚度肯定不够。但是这些内容呢,只是你为了写书的某一(几)章做铺垫而已,有些内容太专太偏,读者大部分场景下是用不到的,好吧,又得骂娘说你这本书不实用…

今天我将基于我过往的实践,以及最近学习asyncio和ES知识完成一个小程序的API服务。看这篇文章前推荐阅读相关的如下文章:

  1. 知乎Live全文搜索之模型设计和爬虫实现
  2. 知乎Live全文搜索之模型接口
  3. 使用Python进行并发编程-我为什么不喜欢Gevent
  4. 使用Python进行并发编程-asyncio篇(一)
  5. 使用Python进行并发编程-asyncio篇(二)
  6. 使用Python进行并发编程-asyncio篇(三)
  7. 知乎Live全文搜索之让elasticsearch_dsl支持asyncio

技术选型

  1. Sanic ,基于Python 3.5+的异步web服务器,和Flask一样使用装饰器作为路由,支持Blueprint。效果确实非常快。
  2. uvloop ,Sanic默认使用uvloop,这个实现基于libuv,比asyncio默认的loop的块很多。
  3. marshmallow ,一个轻量级的转化复杂对象成为Python自带数据类型的库,为什么用它未来会详细介绍。

使用Schema

了解过关系型数据库如MySQL的同学会比较熟悉Schema这个词,它是对数据库的结构描述。Schema定义了表与表和表与字段之间的关系。在使用关系型数据库之前第一件事要先定Schema,创建表(库)再去操作。

为什么我们在实现RESTful API的时候要考虑使用Schema呢?

  1. 首先API服务是给外部用的,比如移动端(安卓、IOS等),前端(用AJAX)等。那么大家一开始就要协商好那些字段,以及字段的类型。因为你不关系,他们都是关心的,如果设计有问题会由于没有正确处理而造成移动端闪退等严重问题。
  2. 你需要对API返回的数据进行良好的管理和验证。

marshmallow把一组数据映射成一个类:

from marshmallow import Schema, fields

classUserSchema(Schema):
    id = fields.Integer()
    url = fields.Str()
    name = fields.Str() 
    ...

这样既让不同编程语言的开发者一目了然,也能检验你提供的数据是不是按照这个定好的结构返回的。

同一组数据可以定义多种Schema

思考一下,在搜索页面,每一项提供的空间有限,你无法把Live的全部信息(比如「描述」这种很长的内容的字段)都展示出来,也就是就算都返回了,其实只用了一部分字段,这造成了更多的网络延时和带宽消耗。但是在Live详情页理论上就可以展示全部的字段的内容了。

再假设下,如果是返回一部分,还是返回全部字段在后端做,就是不同的方法返回时对to_dict方法对一对if/elif/else的处理,其实看起来很乱。我是这样用的:

classUserSchema(Schema):
    id = fields.Integer()
    url = fields.Str()
    name = fields.Str()
    ...
    
    
classUserFullSchema(UserSchema):
    lives_url = fields.Str()
    speaker_id = fields.Str() 
    ...

也就是定义了多种User的schema,按需选择。但是后端统一使用to_dict方法返回全部数据,在视图渲染的时候进行筛选。这样需要用一种好的表达方式:

from views.utils import marshal_with

@bp.route('/search')
@marshal_with([LiveSchema, UserSchema])
async defsearch(request):
    ...


@bp.route('/suggest')
@marshal_with(LiveSchema)
async defsuggest(request):
    ...
    

@bp.route('/user/<user_id>')
@marshal_with([LiveFullSchema, UserFullSchema])
async defuser(request, user_id):
    ...

我们先不考虑视图内的逻辑,简单的理解成他们是单个live的to_dict结果或者多个live的to_dict结果的列表

通过神奇的marshal_with装饰器传入你希望返回符合那种Schema的数据。

现在揭晓一下:

defmarshal(data, fields):
    schemas = [field() for field in fields]
    if isinstance(data, (list, tuple)):
        return [marshal(d, fields) for d in data]

    type = data.get('type')
    for schema in schemas:
        if type in schema.__class__.__name__.lower():
            result, errors = schema.dump(data)
            if errors:
                for item in errors.items():
                    print('{}: {}'.format(*item))
            return result


classmarshal_with(object):
    def__init__(self, fields):
        if not isinstance(fields, list):
            fields = [fields]
        self.fields = fields

    def__call__(self, f):
 @wraps(f)
        async defwrapper(*args, **kwargs):
            resp = await f(*args, **kwargs)
            return marshal(resp, self.fields)
        return wrapper

这个是async版本的,大家可以自己发散成Python 2的普通版。记得之前我们在model里面给每种数据的to_dict方法里面加了个 {'type': 'live'} 这种键值么,除了在小程序里面分辨数据的类型,在这里也是用来匹配那种schema的。举个例子,假如是一个user类型的数据。

@marshal_with([LiveFullSchema, UserFullSchema]) 的装饰下,由于UserFullSchema类名包含了live所以符合了。这比较黑科技一点..

深入使用marshmallow

marshmallow除了生成一个可读性很好的类和验证该字段是不是类型符合以外,还支持序列化和反序列化的处理。有什么意义呢。假如如下的schema:

classLiveSchema(Schema):
    starts_at = fields.Date()

注意我们model存的starts_at是一个datetime类型的对象,无法被json序列化,所以返回之前应该先转化成字符串:

classLiveSchema(Schema):
    starts_at = fields.Method('get_start_time')
    
    defget_start_time(self, obj):
         return int(obj['starts_at'].strftime('%s'))

再举个例子:

classUserSchema(Schema):
    bio = fields.Str()
    headline = fields.Str() 
    description = fields.Str()

bio/headline/description这三个字段内容都有可能比较长,需要做不同的截取:在详情页显示全部,在搜索页之前显示前2行.. 我们需要一个truncate函数:

WIDTH = 45

deftruncate_utf8(str, width=WIDTH):
    return str[:width] + '...' if len(str) > width else str

假如使用fields.Method你就要写三方法,因为它指定的方法只有 self, obj 2个参数,而且Schema是不能继承的。这么办呢?有我的书中介绍的partialmethod可完美实现:

from functools import partialmethod

classItem(object):
    deftruncate(self, attr, obj):
        if attr not in obj:
            return ''
        return truncate_utf8(obj[attr], WIDTH)


classUserSchema(Schema):
    bio = fields.Method('truncate_bio')
    headline = fields.Method('truncate_headline')
    description = fields.Method('truncate_description')
    truncate_headline = partialmethod(Item.truncate, 'headline')
    truncate_bio = partialmethod(Item.truncate, 'bio')
    truncate_description = partialmethod(Item.truncate, 'description')

通过partialmethod + 非继承至Schema的类就可以实现继承和额外参数了。

对Sanic定制

我们都知道,当你有独特的需求而框架不满足的时候,就要对其进行二次开发或者封装。我一般倾向基于框架提供的灵活性去封装。由于在多个API上都有分页的需要,参数中会出现start/limit(当然你可以更喜欢其他的词汇)。如果不定制,那么在每个视图里面都要加一句:

@bp.route('/search')
@marshal_with([LiveSchema, UserSchema])
async def search(request): 
    start = request.args.get('start', 0)
    limit = request.args.get('limit', 10) 
    ...

这2句就是个累赘。怎么做呢? 利用sanic提供的中间件就好了:

@app.middleware('request')
async defhalt_request(request):
    request.start = request.args.get('start', 0)
    request.limit = request.args.get('limit', 10)

但是这还不够,因为sanic在创建Request的时候基于内存的考虑使用了__slots__,所以我们要重新写on_headers_complete方法:

from sanic.server import HttpProtocol, CIMultiDict
from sanic.request import Request as _Request 


classRequest(_Request):
    __slots__ = (
        'url', 'headers', 'version', 'method', '_cookies',
        'query_string', 'body', 'start', 'limit',
        'parsed_json', 'parsed_args', 'parsed_form', 'parsed_files',
    )


classJSONHttpProtocol(HttpProtocol):
    defon_headers_complete(self):
        remote_addr = self.transport.get_extra_info('peername')
        if remote_addr:
            self.headers.append(('Remote-Addr', '%s:%s' % remote_addr))

        self.request = Request(
            url_bytes=self.url,
            headers=CIMultiDict(self.headers),
            version=self.parser.get_http_version(),
            method=self.parser.get_method().decode()
        )
        
if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8300, protocol=JSONHttpProtocol,
            workers=4, debug=True)

这样在视图中就可以直接使用request.start和request.limit了。

其次看官方用法,都是在视图中控制返回的内容的类型,比如:

from sanic.response import json

@app.route('/')
async deftest(request):
    return json({"hello": "world"})

我也希望封装全部的返回结果的格式为:

{'rs': data}

PS: 当然生产环境中应该还有一个error字段甚至error_code字段标识如果出错的信息和类型等字段,我这里作为演示就保留了一个rs字段

这个data就是实际的视图返回的结果,但是在响应的时候已经封好了。可以继续重写write_response方法:

classJSONHttpProtocol(HttpProtocol):
    defwrite_response(self, response):
        if isinstance(response, str):
            response = text(response)
        elif isinstance(response, (list, dict)):
            response = {'rs': response}
        if isinstance(response, dict):
            response = json(response)

        return super().write_response(response)

这样返回的结果就很统一了。

至此,一个异步的、风格明确的、功能满足需要的API服务就完成了。

项目具体代码可见: https://github.com/dongweiming/weapp-zhihulive

原文  http://www.dongwm.com/archives/构建基于asyncio sanic的RESTfulAPI服务/
正文到此结束
Loading...