看这篇文章前推荐阅读相关的如下文章:
知乎Live全文搜索之让elasticsearch_dsl支持asyncio
知乎Live全文搜索之使用Elasticsearch全文搜索
知乎Live全文搜索之模型设计和爬虫实现
知乎Live全文搜索之使用Elasticsearch做聚合分析
知乎Live全文搜索之使用Elasticsearch做搜索建议
在 知乎Live全文搜索之让elasticsearch_dsl支持asyncio 一文中,我把后端工作分成了4步,今天是完成爬虫和模型接口这2步,接口返回的数据会被微信小程序使用。
详细的列一下接口需求:
搜索。搜索符合输入的关键字的Live和用户,按照之前提到的各种策略排序,也支持通过status状态过滤「已结束」和「未结束」2种类型的Live。支持分页。
搜索建议。提供符合输入的关键字的Live的建议。
发现。把全部的Live按照之前提到的各种策略排序,可以通过各种字段排序,可以选择Live开始的时间范围(默认是全部)。
获取热门话题。
获取某话题详细信息及话题下的Live,支持分页、排序、时间范围。
获取全部用户,并且可以按照举办的Live数量、更新Live时间等条件排序。
获取单个用户信息。
根据各种策略排序,获取7天热门Live,非知乎排序。
根据各种策略排序,获取30天热门Live,非知乎排序。
由于4和5的需求,我添加了Topic这个模型,上篇文章说过SQLite不支持并发,所以替换成了MySQL,要把config里面的DB_URI改成如下格式:
DB_URI = 'mysql+pymysql://localhost/test?charset=utf8mb4'
其中test是库的名字,charset要用utf8mb4,因为有些用户信息甚至Live的标题里面包含emoji。MySQL的客户端用的是PyMySQL,需要在schema上指出来。
Topic类和之前的User格式差不多,只是不同的字段,限于篇幅就不列出来了。
为了实现可以按照举办的Live数量、更新Live时间排序,我添加了2个字段,也改了字符集:
from config import SUGGEST_USER_LIMIT, PEOPLE_URL, LIVE_USER_URL class User(Base): __tablename__ = 'users' __table_args__ = { 'mysql_engine': 'InnoDB', 'mysql_charset': 'utf8mb4' } ... live_count = Column(Integer, default=0) updated_time = Column(DateTime, default=datetime.now)
接着添加一些必要的方法:
class User(Base): ... def incr_live_count(self): self.live_count += 1 session.commit() @property def url(self): return PEOPLE_URL.format(self.speaker_id) @property def lives_url(self): return LIVE_USER_URL.format(self.speaker_id) def to_dict(self): d = {c.name: getattr(self, c.name, None) for c in self.__table__.columns} d.update({ 'type': 'user', 'url': self.url, 'lives_url': self.lives_url }) return d
我习惯给model添加一个to_dict方法,把需要的字段和值拼成一个dict返回。当然有些API实际不需要这么多的字段,在下一篇中我会介绍怎么处理schema的问题。
最后是3个接口方法:
class User(Base): @classmethod def add(cls, **kwargs): speaker_id = kwargs.get('speaker_id', None) r = None if id is not None: q = session.query(cls).filter_by(speaker_id=speaker_id) r = q.first() if r: q.update(kwargs) if r is None: r = cls(**kwargs) session.add(r) try: session.commit() except: session.rollback() else: return r @classmethod def suggest(cls, q, start=0, limit=SUGGEST_USER_LIMIT): query = session.query(User) users = query.filter(User.name.like('%{}%'.format(q))).offset( start).limit(limit).all() return [user.to_dict() for user in users] @classmethod def get_all(cls, order_by='id', start=0, limit=10, desc=False): ''' :param order_by: One of ``'id'``, ``'live_count'`` or ``'updated_time'`` ''' query = session.query(User) order_by = getattr(User, order_by) if desc: order_by = _desc(order_by) users = query.order_by(order_by).offset(start).limit(limit).all() return [user.to_dict() for user in users]
需要注意add方法,其实叫做add_or_update更合适,需要使用session一定要commit才能提交数据。
sqlalchemy没有自带的suggest功能,只能用Like来实现。get_all方法就是上面第6个需求接口。
首先道歉,之前我理解的自定义analyzer的用法是错误的,下面的才是正确的姿势:
from elasticsearch_dsl.analysis import CustomAnalyzer ik_analyzer = CustomAnalyzer( 'ik_analyzer', tokenizer='ik_max_word', filter=['lowercase'] )
tokenizer字段是必选的,这里使用ik分词插件提供的ik_max_word。我还给Live添加了2个字段:
class Live(DocType): cover = Text(index='not_analyzed') # 对应专栏头图(如果有) zhuanlan_url = Text(index='not_analyzed') # 对应专栏地址
加上参数 index='not_analyzed'
是因为这2个字段不用于搜索和聚合,没必要分词,就当成数据库使用了。
也给Live添加一些属性和方法,方便最后用to_dict()生成需要的全部数据:
from .speaker import User, session class Live(DocType): @property def id(self): return self._id @property def speaker(self): return session.query(User).get(self.speaker_id) @property def url(self): return LIVE_URL.format(self.id) class Meta: index = 'live130' def to_dict(self, include_extended=True): d = super().to_dict() if include_extended: d.update({ 'id': self._id, 'type': 'live', 'speaker': self.speaker.to_dict(), 'url': self.url }) return d
其中speaker属性是常见的关联多个model的快捷方式,但是需要注意,竟然不要设计成A的model里面某个方法返回了B的model数据,B的model里面也返回了A的model的数据而造成只能进行方法内import。
用 super().to_dict()
的原因是DocType内置了to_dict方法,随便提一下,而且接收include meta参数,为True会包含index和doc type的元数据。
这个是今天的重点,昨天说的「让elasticsearch_dsl支持asyncio」就是给今天做准备。换汤不换药,说白了就是在合适的地方添加async/await关键字,先看个add的:
class Live(DocType): ... @classmethod async def add(cls, **kwargs): id = kwargs.pop('id', None) if id is None: return False live = cls(meta={'id': int(id)}, **kwargs) await live.save() return live
现在我们挨个实现需求,首先是搜索接口,由于DocType包含了search方法,得换个名字了:
class Live(DocType): ... async def _execute(cls, s, order_by=None): # 可以选择字段的排序,前面加-表示desc,不加就是默认的asc if order_by is not None: s = s.sort(order_by) lives = await s.execute() # 执行,要在这步之前拼好查询条件 return [live.to_dict() for live in lives] @classmethod def apply_weight(cls, s, start, limit): return s.query(Q('function_score', functions=[gauss_sf, log_sf])).extra( **{'from': start, 'size': limit}) @classmethod async def ik_search(cls, query, status=None, start=0, limit=10): s = cls.search() # 多字段匹配要搜索的内容,SEARCH_FIELDS中不同字段权重不同 s = s.query('multi_match', query=query, fields=SEARCH_FIELDS) if status is not None: # 根据结束状态过滤 s = s.query('match', status=status) # 搜索是带权重的,按照之前的设计做了时间衰减和归一化 s = cls.apply_weight(s, start, limit) return await cls._execute(s)
就是根据需求,按照DSL的方式来拼。我添加了些注释,看不懂的话可以按照文章开始的链接去找找答案。
然后是发现接口,7/30天热门都是基于这个接口,只不过划定了时间:
class Live(DocType): ... @classmethod async def explore(cls, from_date=None, to_date=None, order_by=None, start=0, limit=10, topic=None): s = cls.search() if topic is not None: s = s.query(Q('term', topic_names=topic)) starts_at = {} if from_date is not None: starts_at['from'] = from_date if to_date is not None: starts_at['to'] = to_date if starts_at: s = s.query(Q('range', starts_at=starts_at)) if order_by is None: s = cls.apply_weight(s, start, limit) return await cls._execute(s, order_by) @classmethod async def get_hot_weekly(cls): today = date.today() return await cls.explore(from_date=today - timedelta(days=7), to_date=today, limit=20) @classmethod async def get_hot_monthly(cls): today = date.today() return await cls.explore(from_date=today - timedelta(days=30), to_date=today, limit=50)
注意,explore方法如果指定了排序方案,就不会添加时间衰减和归一化的处理了。
然后是获取用户举报的全部Live的方法:
class Live(DocType): ... @classmethod async def ik_search_by_speaker_id(cls, speaker_id, order_by='-starts_at'): s = cls.search() s = s.query(Q('bool', should=Q('match', speaker_id=speaker_id))) return await cls._execute(s, order_by)
可以看到_execute方法抽象后被重复利用了。
再然后是suggest接口:
class Live(DocType): ... @classmethod async def ik_suggest(cls, query, size=10): s = cls.search() s = s.suggest('live_suggestion', query, completion={ 'field': 'live_suggest', 'fuzzy': {'fuzziness': 2}, 'size': size }) suggestions = await s.execute_suggest() matches = suggestions.live_suggestion[0].options ids = [match._id for match in matches] lives = await Live.mget(ids) return [live.to_dict() for live in lives]
其中支持2个编辑距离的模糊搜索。这个实现的比较简单,没有考虑拼音,也没有考虑搜索用户。值得一提的是DocType提供了mget这个获取多个id的接口,请善用减少网络请求,也就是给ES后端减压。
第4个获得热门话题的需求是本项目唯一用到聚合功能的地方了:
from .topic import Topic class Live(DocType): @classmethod async def get_hot_topics(cls, size=50): s = cls.search() s.aggs.bucket('topics', A('terms', field='topics', size=size)) rs = await s.execute() buckets = rs.aggregations.topics.buckets topic_names = [r['key'] for r in buckets] topics = session.query(Topic).filter(Topic.name.in_(topic_names)).all() topics = sorted(topics, key=lambda t: topic_names.index(t.name)) return [topic.to_dict() for topic in topics]
每个Live都会打话题标签,越多的live打这个话题就说明它越热门。
最后要说的是init()方法:
async def init(): await Live.init()
原来import模块的时候直接就init了,现在由于异步化了,直接init没人所以要在loop中用,比如在爬虫中:
from models.live import init as live_init if __name__ == '__main__': loop = asyncio.get_event_loop() crawler = Crawler() loop.run_until_complete(live_init()) loop.run_until_complete(crawler.crawl()) print('Finished in {:.3f} secs'.format(crawler.t1 - crawler.t0)) crawler.close() loop.close() es.transport.close()
理解了嘛?
好了全部接口都完成了,但是大家有木有感觉,异步编程调试起来很麻烦,我来教一个好用的方法.
asyncio要求把需要协程化的函数都放进一个loop,通过run_until_complete方法让它执行完成。
但是现在非常不好玩:
In : from models import Live In : live = Live.get(789840559912009728) In : live Out: <coroutine object DocType.get at 0x10a0d1fc0> In : live.subject --------------------------------------------------------------------------- AttributeError Traceback (most recent call last) <ipython-input-4-8c237874146c> in <module>() ----> 1 live.subject AttributeError: 'coroutine' object has no attribute 'subject'
异步化的函数(方法)用起来很不直观。一开始可以写个脚本把要调试的东西放进去用(test_es.py):
import asyncio from elasticsearch_dsl.connections import connections from models.live import Live, SEARCH_FIELDS, init as live_init s = Live.search() es = connections.get_connection(Live._doc_type.using) async def print_info(): rs = await s.query('multi_match', query='python', fields=SEARCH_FIELDS).execute() print(rs) loop = asyncio.get_event_loop() loop.run_until_complete(live_init()) loop.run_until_complete(print_info()) loop.close() es.transport.close()
这样也是可以调试的,很麻烦,对吧?
抽象一下,其实写个函数就好了:
import asyncio def execute(coro): loop = asyncio.get_event_loop() rs = loop.run_until_complete(coro) return rs
OK, 再用:
In : from models import Live, execute In : live = Live.get(789840559912009728) In : live = execute(live) In : live.subject Out: 'Python 工程师的入门和进阶'
这样就方便多了。