转载

知乎 Live 全文搜索之模型接口

知乎 Live 全文搜索之模型接口

看这篇文章前推荐阅读相关的如下文章:

  1. 知乎Live全文搜索之让elasticsearch_dsl支持asyncio

  2. 知乎Live全文搜索之使用Elasticsearch全文搜索

  3. 知乎Live全文搜索之模型设计和爬虫实现

  4. 知乎Live全文搜索之使用Elasticsearch做聚合分析

  5. 知乎Live全文搜索之使用Elasticsearch做搜索建议

在 知乎Live全文搜索之让elasticsearch_dsl支持asyncio  一文中,我把后端工作分成了4步,今天是完成爬虫和模型接口这2步,接口返回的数据会被微信小程序使用。

详细的列一下接口需求:

  1. 搜索。搜索符合输入的关键字的Live和用户,按照之前提到的各种策略排序,也支持通过status状态过滤「已结束」和「未结束」2种类型的Live。支持分页。

  2. 搜索建议。提供符合输入的关键字的Live的建议。

  3. 发现。把全部的Live按照之前提到的各种策略排序,可以通过各种字段排序,可以选择Live开始的时间范围(默认是全部)。

  4. 获取热门话题。

  5. 获取某话题详细信息及话题下的Live,支持分页、排序、时间范围。

  6. 获取全部用户,并且可以按照举办的Live数量、更新Live时间等条件排序。

  7. 获取单个用户信息。

  8. 根据各种策略排序,获取7天热门Live,非知乎排序。

  9. 根据各种策略排序,获取30天热门Live,非知乎排序。

添加Topic模型

由于4和5的需求,我添加了Topic这个模型,上篇文章说过SQLite不支持并发,所以替换成了MySQL,要把config里面的DB_URI改成如下格式:

DB_URI = 'mysql+pymysql://localhost/test?charset=utf8mb4'

其中test是库的名字,charset要用utf8mb4,因为有些用户信息甚至Live的标题里面包含emoji。MySQL的客户端用的是PyMySQL,需要在schema上指出来。

Topic类和之前的User格式差不多,只是不同的字段,限于篇幅就不列出来了。

实现用户相关方法

为了实现可以按照举办的Live数量、更新Live时间排序,我添加了2个字段,也改了字符集:

from config import SUGGEST_USER_LIMIT, PEOPLE_URL, LIVE_USER_URL
class User(Base):
   __tablename__ = 'users'
   __table_args__ = {
       'mysql_engine': 'InnoDB',
       'mysql_charset': 'utf8mb4'
   }
   ...
   live_count = Column(Integer, default=0)
   updated_time = Column(DateTime, default=datetime.now)

接着添加一些必要的方法:

class User(Base):
   ...
   def incr_live_count(self):
       self.live_count += 1
       session.commit()
   @property
   def url(self):
       return PEOPLE_URL.format(self.speaker_id)
   @property
   def lives_url(self):
       return LIVE_USER_URL.format(self.speaker_id)
   def to_dict(self):
       d = {c.name: getattr(self, c.name, None)
            for c in self.__table__.columns}
       d.update({
           'type': 'user',
           'url': self.url,
           'lives_url': self.lives_url
       })
       return d

我习惯给model添加一个to_dict方法,把需要的字段和值拼成一个dict返回。当然有些API实际不需要这么多的字段,在下一篇中我会介绍怎么处理schema的问题。

最后是3个接口方法:

class User(Base):
   @classmethod
   def add(cls, **kwargs):
       speaker_id = kwargs.get('speaker_id', None)
       r = None
       if id is not None:
           q = session.query(cls).filter_by(speaker_id=speaker_id)
           r = q.first()
           if r:
               q.update(kwargs)
       if r is None:
           r = cls(**kwargs)
           session.add(r)
       try:
           session.commit()
       except:
           session.rollback()
       else:
           return r
   @classmethod
   def suggest(cls, q, start=0, limit=SUGGEST_USER_LIMIT):
       query = session.query(User)
       users = query.filter(User.name.like('%{}%'.format(q))).offset(
           start).limit(limit).all()
       return [user.to_dict() for user in users]
   @classmethod
   def get_all(cls, order_by='id', start=0, limit=10, desc=False):
       '''
       :param order_by:  One of ``'id'``, ``'live_count'`` or
                         ``'updated_time'``
       '''
       query = session.query(User)
       order_by = getattr(User, order_by)
       if desc:
           order_by = _desc(order_by)
       users = query.order_by(order_by).offset(start).limit(limit).all()
       return [user.to_dict() for user in users]

需要注意add方法,其实叫做add_or_update更合适,需要使用session一定要commit才能提交数据。

sqlalchemy没有自带的suggest功能,只能用Like来实现。get_all方法就是上面第6个需求接口。

完成Live模型字段

首先道歉,之前我理解的自定义analyzer的用法是错误的,下面的才是正确的姿势:

from elasticsearch_dsl.analysis import CustomAnalyzer
ik_analyzer = CustomAnalyzer(
   'ik_analyzer', tokenizer='ik_max_word',
   filter=['lowercase']
)

tokenizer字段是必选的,这里使用ik分词插件提供的ik_max_word。我还给Live添加了2个字段:

class Live(DocType):
   cover = Text(index='not_analyzed')  # 对应专栏头图(如果有)
   zhuanlan_url = Text(index='not_analyzed') # 对应专栏地址

加上参数 index='not_analyzed' 是因为这2个字段不用于搜索和聚合,没必要分词,就当成数据库使用了。

也给Live添加一些属性和方法,方便最后用to_dict()生成需要的全部数据:

from .speaker import User, session
class Live(DocType):
   @property
   def id(self):
       return self._id
   @property
   def speaker(self):
       return session.query(User).get(self.speaker_id)
   @property
   def url(self):
       return LIVE_URL.format(self.id)
   class Meta:
       index = 'live130'
   def to_dict(self, include_extended=True):
       d = super().to_dict()
       if include_extended:
           d.update({
               'id': self._id,
               'type': 'live',
               'speaker': self.speaker.to_dict(),
               'url': self.url
           })
       return d

其中speaker属性是常见的关联多个model的快捷方式,但是需要注意,竟然不要设计成A的model里面某个方法返回了B的model数据,B的model里面也返回了A的model的数据而造成只能进行方法内import。

super().to_dict() 的原因是DocType内置了to_dict方法,随便提一下,而且接收include meta参数,为True会包含index和doc type的元数据。

把Live设计成异步的

这个是今天的重点,昨天说的「让elasticsearch_dsl支持asyncio」就是给今天做准备。换汤不换药,说白了就是在合适的地方添加async/await关键字,先看个add的:

class Live(DocType):
   ...
   @classmethod
   async def add(cls, **kwargs):
       id = kwargs.pop('id', None)
       if id is None:
           return False
       live = cls(meta={'id': int(id)}, **kwargs)
       await live.save()
       return live

现在我们挨个实现需求,首先是搜索接口,由于DocType包含了search方法,得换个名字了:

class Live(DocType):
   ...
   async def _execute(cls, s, order_by=None):
       # 可以选择字段的排序,前面加-表示desc,不加就是默认的asc
       if order_by is not None:
           s = s.sort(order_by)
       lives = await s.execute()  # 执行,要在这步之前拼好查询条件
       return [live.to_dict() for live in lives]
   @classmethod
   def apply_weight(cls, s, start, limit):
       return s.query(Q('function_score', functions=[gauss_sf, log_sf])).extra(
           **{'from': start, 'size': limit})
   @classmethod
   async def ik_search(cls, query, status=None, start=0, limit=10):
       s = cls.search()
       # 多字段匹配要搜索的内容,SEARCH_FIELDS中不同字段权重不同
       s = s.query('multi_match', query=query,
                   fields=SEARCH_FIELDS)
       if status is not None:  # 根据结束状态过滤
           s = s.query('match', status=status)
       # 搜索是带权重的,按照之前的设计做了时间衰减和归一化
       s = cls.apply_weight(s, start, limit)
       return await cls._execute(s)

就是根据需求,按照DSL的方式来拼。我添加了些注释,看不懂的话可以按照文章开始的链接去找找答案。

然后是发现接口,7/30天热门都是基于这个接口,只不过划定了时间:

class Live(DocType):
   ...
   @classmethod
   async def explore(cls, from_date=None, to_date=None, order_by=None,
                     start=0, limit=10, topic=None):
       s = cls.search()
       if topic is not None:
           s = s.query(Q('term', topic_names=topic))
       starts_at = {}
       if from_date is not None:
           starts_at['from'] = from_date
       if to_date is not None:
           starts_at['to'] = to_date
       if starts_at:
           s = s.query(Q('range', starts_at=starts_at))
       if order_by is None:
           s = cls.apply_weight(s, start, limit)
       return await cls._execute(s, order_by)
   @classmethod
   async def get_hot_weekly(cls):
       today = date.today()
       return await cls.explore(from_date=today - timedelta(days=7),
                                 to_date=today, limit=20)
   @classmethod
   async def get_hot_monthly(cls):
       today = date.today()
       return await cls.explore(from_date=today - timedelta(days=30),
                                to_date=today, limit=50)

注意,explore方法如果指定了排序方案,就不会添加时间衰减和归一化的处理了。

然后是获取用户举报的全部Live的方法:

class Live(DocType):
   ...
   @classmethod
   async def ik_search_by_speaker_id(cls, speaker_id, order_by='-starts_at'):
       s = cls.search()
       s = s.query(Q('bool', should=Q('match', speaker_id=speaker_id)))
       return await cls._execute(s, order_by)

可以看到_execute方法抽象后被重复利用了。

再然后是suggest接口:

class Live(DocType):
   ...
   @classmethod
   async def ik_suggest(cls, query, size=10):
       s = cls.search()
       s = s.suggest('live_suggestion', query, completion={
           'field': 'live_suggest', 'fuzzy': {'fuzziness': 2}, 'size': size
       })
       suggestions = await s.execute_suggest()
       matches = suggestions.live_suggestion[0].options
       ids = [match._id for match in matches]
       lives = await Live.mget(ids)
       return [live.to_dict() for live in lives]

其中支持2个编辑距离的模糊搜索。这个实现的比较简单,没有考虑拼音,也没有考虑搜索用户。值得一提的是DocType提供了mget这个获取多个id的接口,请善用减少网络请求,也就是给ES后端减压。

第4个获得热门话题的需求是本项目唯一用到聚合功能的地方了:

from .topic import Topic
class Live(DocType):
   @classmethod
   async def get_hot_topics(cls, size=50):
       s = cls.search()
       s.aggs.bucket('topics', A('terms', field='topics', size=size))
       rs = await s.execute()
       buckets = rs.aggregations.topics.buckets
       topic_names = [r['key'] for r in buckets]
       topics = session.query(Topic).filter(Topic.name.in_(topic_names)).all()
       topics = sorted(topics, key=lambda t: topic_names.index(t.name))
       return [topic.to_dict() for topic in topics]

每个Live都会打话题标签,越多的live打这个话题就说明它越热门。

最后要说的是init()方法:

async def init():
   await Live.init()

原来import模块的时候直接就init了,现在由于异步化了,直接init没人所以要在loop中用,比如在爬虫中:

from models.live import init as live_init
if __name__ == '__main__':
   loop = asyncio.get_event_loop()
   crawler = Crawler()
   loop.run_until_complete(live_init())
   loop.run_until_complete(crawler.crawl())
   print('Finished in {:.3f} secs'.format(crawler.t1 - crawler.t0))
   crawler.close()
   loop.close()
   es.transport.close()

理解了嘛?

好了全部接口都完成了,但是大家有木有感觉,异步编程调试起来很麻烦,我来教一个好用的方法.

调试async程序

asyncio要求把需要协程化的函数都放进一个loop,通过run_until_complete方法让它执行完成。

但是现在非常不好玩:

In : from models import Live
In : live = Live.get(789840559912009728)
In : live
Out: <coroutine object DocType.get at 0x10a0d1fc0>
In : live.subject
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-4-8c237874146c> in <module>()
----> 1 live.subject
AttributeError: 'coroutine' object has no attribute 'subject'

异步化的函数(方法)用起来很不直观。一开始可以写个脚本把要调试的东西放进去用(test_es.py):

import asyncio
from elasticsearch_dsl.connections import  connections
from models.live import Live, SEARCH_FIELDS, init as live_init
s = Live.search()
es = connections.get_connection(Live._doc_type.using)
async def print_info():
   rs = await s.query('multi_match', query='python',
                      fields=SEARCH_FIELDS).execute()
   print(rs)
loop = asyncio.get_event_loop()
loop.run_until_complete(live_init())
loop.run_until_complete(print_info())
loop.close()
es.transport.close()

这样也是可以调试的,很麻烦,对吧?

抽象一下,其实写个函数就好了:

import asyncio
def execute(coro):
   loop = asyncio.get_event_loop()
   rs = loop.run_until_complete(coro)
   return rs

OK, 再用:

In : from models import Live, execute
In : live = Live.get(789840559912009728)
In : live = execute(live)
In : live.subject
Out: 'Python 工程师的入门和进阶'

这样就方便多了。

知乎 Live 全文搜索之模型接口

原文  http://mp.weixin.qq.com/s/rptOMPZ2eYUIGBF393nTxg
正文到此结束
Loading...