在 Flask 项目中使用 Celery 这篇文章谈到了如何在 Flask 项目中集成 Celery,也讲了在 celery 任务中引用 Flask 的 application context 的方法。一般情况下那样使用是没问题的,但是如果需要在 task 中使用 gevent,就需要一些额外的改进。至少有两点。
如果在 task 中要使用 gevent,就必须使用 gevent 并发模型。这很好处理,只需要修改启动选项就行:
$ celery worker -A celery_worker.celery -P gevent -c 10 -l INFO
上面的命令, -P
选项指定 pool,默认是 prefork,这里是 gevent; -c
设置并发数。
这个问题也是在 在 Flask 项目中使用 Celery 中重点讨论的,在这种场景下,上文的解决方法起不到作用,仍然会报错(具体原因不太懂,知道的朋友请不吝赐教)。解决方案就是,把需要引用 Flask app 的地方(如 app.config),放到 Flask 的 application context 里执行,如:
with app.app_context(): print app.config.get('SOME_CONFIG_KEY')
在实际应用中,我最后写了个装饰器来实现这个目的。简单介绍一下场景,项目用到了 Flask-Cache,项目启动时会创建全局单例 cache
,并在 create_app
中进行初始化。在 Flask-Cache 初始化时,会把当前的 Flask app 对象绑定到实例 cache
中,所以可以尝试从这里获取 app 对象。
代码的目录结构与之前一样:
. ├── README.md ├── app │ ├── __init__.py │ ├── config.py │ ├── forms │ ├── models │ ├── tasks │ │ ├── __init__.py │ │ └── email.py │ └── views │ │ ├── __init__.py │ │ └── account.py ├── celery_worker.py ├── manage.py └── wsgi.py
装饰器:
def with_app_context(task): memo = {'app': None} @functools.wraps(task) def _wrapper(*args, **kwargs): if not memo['app']: try: # 尝试从 cache 中获取 app 对象,如果得到的不是 None,就不需要重复创建了 app = cache.app _ = app.name except Exception: from app import create_app app = create_app() memo['app'] = app else: app = memo['app'] # 把 task 放到 application context 环境中运行 with app.app_context(): return task(*args, **kwargs) return _wrapper
使用:
@celery.task() @with_app_context def add(x, y): print app.config.get('SOME_CONFIG_KEY') return x + y