转载

在 Flask 项目的 celery 中使用 gevent

在 Flask 项目中使用 Celery 这篇文章谈到了如何在 Flask 项目中集成 Celery,也讲了在 celery 任务中引用 Flask 的 application context 的方法。一般情况下那样使用是没问题的,但是如果需要在 task 中使用 gevent,就需要一些额外的改进。至少有两点。

1. 使用 gevent 并发模型

如果在 task 中要使用 gevent,就必须使用 gevent 并发模型。这很好处理,只需要修改启动选项就行:

$ celery worker -A celery_worker.celery -P gevent -c 10 -l INFO 

上面的命令, -P 选项指定 pool,默认是 prefork,这里是 gevent; -c 设置并发数。

2. 引用 Flask 的 application context

这个问题也是在 在 Flask 项目中使用 Celery 中重点讨论的,在这种场景下,上文的解决方法起不到作用,仍然会报错(具体原因不太懂,知道的朋友请不吝赐教)。解决方案就是,把需要引用 Flask app 的地方(如 app.config),放到 Flask 的 application context 里执行,如:

with app.app_context():     print app.config.get('SOME_CONFIG_KEY') 

在实际应用中,我最后写了个装饰器来实现这个目的。简单介绍一下场景,项目用到了 Flask-Cache,项目启动时会创建全局单例 cache ,并在 create_app 中进行初始化。在 Flask-Cache 初始化时,会把当前的 Flask app 对象绑定到实例 cache 中,所以可以尝试从这里获取 app 对象。

代码的目录结构与之前一样:

. ├── README.md ├── app │   ├── __init__.py │   ├── config.py │   ├── forms │   ├── models │   ├── tasks │   │   ├── __init__.py │   │   └── email.py │   └── views │   │   ├── __init__.py │   │   └── account.py ├── celery_worker.py ├── manage.py └── wsgi.py 

装饰器:

def with_app_context(task):     memo = {'app': None}      @functools.wraps(task)     def _wrapper(*args, **kwargs):         if not memo['app']:             try:                 # 尝试从 cache 中获取 app 对象,如果得到的不是 None,就不需要重复创建了                 app = cache.app                 _ = app.name             except Exception:                 from app import create_app                  app = create_app()             memo['app'] = app         else:             app = memo['app']          # 把 task 放到 application context 环境中运行         with app.app_context():             return task(*args, **kwargs)      return _wrapper 

使用:

@celery.task() @with_app_context def add(x, y):     print app.config.get('SOME_CONFIG_KEY')     return x + y 
原文  http://liyangliang.me/posts/2016/05/using-celery-with-flask-and-gevent/
正文到此结束
Loading...