Celery
Celery是基于Python的一个开源的分布式实时任务队列工具,同时也支持任务调度。
简介
时间轴
- 2009年04月24日,Ask Solem 发布Celery 0.1.0
- 2010年02月10日,发布Celery 1.0.0
- 2016年11月04日,发布Celery 4.0.0。配置文件引入小写字母设置和重命名了一些前缀。
- 2020年09月24日,发布Celery 5.0.0
- 2021年06月18日,发布Celery 5.1
了解更多 >> Celery 文档:历史
安装
使用pip安装:
pip install -U Celery
入门
基本架构
Celery架构是一种生产者消费者模型,生产者(调用异步任务或定时任务)将任务发送到消息中间件(Redis或RabbitMQ等),消费者(任务执行单元)从消息中间件读取执行。
基本概念
名称 | 描述 |
---|---|
示例
异步任务
定时任务
Celery也支持定时任务,即周期性任务(Periodic Tasks)。celery beat定期发布任务,然后由celery worker执行。
了解更多 >> Celery 文档:周期性任务
定时
schedule
用于设置执行的频率,可以是整数、timedelta、crontab、solar或自定义类型。一个示例:
from celery.schedules import crontab
beat_schedule = {
# 每个周星期一早上7点30分执行
'add-every-monday-morning': {
'task': 'tasks.task1',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}
类别 | 描述 | 示例 |
---|---|---|
数字 | 表示秒 | 30.0 每30秒执行一次
|
timedelta | Python的timedelta对象,用于表示时间间隔。 使用时需要导入timedelta: from datetime import timedelta
|
timedelta(seconds=30) 每30秒执行一次
|
crontab | 能够方便设置每天或每星期什么时间执行。 使用时需要导入crontab: from celery.schedules import crontab
|
crontab() 每分钟执行 crontab(minute=0, hour=0) 每天0点0分执行crontab(hour=7, minute=30, day_of_week=1) 每个周星期一早上7点30分执行 crontab(minute=0, hour='*/6') 每六个小时执行一次:午夜、早上 6 点、中午、下午 6 点 crontab(minute=0, hour='0,6,12,18') 同前面
|
solar | 太阳时间表,按照日出、日落、黎明或黄昏来执行。 使用时需要导入solar: from celery.schedules import solar 格式: solar(event, latitude, longitude)
|
'schedule': solar('sunset', -37.81753, 144.96715) 墨尔本日落时执行
|
自定义 |
启动
启动celery beat服务:
celery -A your_proj beat
通过-B,也可在启动celery worker服务时,同时启动celery beat服务。这种简单方便,但只适用于单个worker节点,因此不建议用于生产用途:
celery -A your_proj worker -B
了解更多 >> Celery 文档:周期性任务 - 启动调度器
配置
概览
配置一般写在单独的文件中。如果使用默认加载器,文件名称需要为celeryconfig.py。Celery 4.0 版引入了小写设置和重命名一些前缀,旧的配置命令将在Celery 6.0不可用。一个简单的配置文件:
# celery_task为项目目录,包含__init__.py、celeryconfig.py、task1.py、task2.py
# celery_task
# ├── __init__.py
# ├── celeryconfig.py
# ├── task1.py
# └── task2.py
# 以下为celeryconfig.py文件内容:
# 任务队列设置
broker_url = 'redis://127.0.0.1:6379'
# 任务转态和结果存储设置
result_backend = 'redis://127.0.0.1:6379/0'
# 指定时区,默认是 UTC
timezone = 'Asia/Shanghai'
# 导入任务模块
imports = (
'celery_task.task1',
'celery_task.task2'
)
了解更多 >> Celery 文档:配置和默认值
Broker
名称 | 描述 | 示例 |
---|---|---|
broker_url | 默认消息队列URL。格式:transport://userid:password@hostname:port/virtual_host transport默认为 amqp:// ,其他可选redis:// 、sqs:// 和qpid:// 。
|
broker_url = 'redis://127.0.0.1:6379' 使用redis作为消息队列。
|
日期时间
名称 | 描述 | 示例 |
---|---|---|
enable_utc | ||
timezone | 时区,默认"UTC" |
结果存储
监控和管理
概览
Celery几个常用监控管理工具:
名称 | 描述 |
---|---|
命令 | |
Flower | Flower是Celery的实时网络监控和管理工具。 使用: # 安装flower
pip install flower
# 为celery项目启动一个网络服务器,默认端口5555,http://localhost:5555/
celery -A celery项目 flower
# 浏览器访问flower,http://localhost:5555/
|
celery events | 一个简单监视管理工具,在 Celery 2.0 版本新增。可以显示任务和worker的历史,以及查看任务的结果和追溯,它还支持一些管理命令,如限制速度和关闭worker。 使用: celery -A celery项目 events
|
了解更多 >> Celery 文档:监控和管理
Flower
docker
使用docker compose可以快速搭建Flower测试环境,flower官方有配置好的Celery + Flower + Redis + prometheus + grafana测试环境。 以下配置Celery + Flower + Redis环境。
项目结构:
examples
├── docker-compose.yml
├── Dockerfile
└── examples
├── celeryconfig.py
└── tasks.py
Docker
celery
使用docker可以快速搭建环境,如搭建一个简单测试环境celery + redis,项目目录如下:
example
├── docker-compose.yml
├── Dockerfile
└── tasks.py
tasks.py用于配置Celery,注册异步函数
from celery import Celery
app = Celery('tasks', broker='redis://redis:6379/0', backend='redis://redis:6379/0' )
# 设置定时任务,每隔30秒执行一次add任务
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}
# 注册异步任务
@app.task
def add(x, y):
return x + y
Dockerfile是用于构建celery镜像,redis镜像直接使用官方。
FROM python:alpine
# 安装celery和redis客户端
RUN pip install --no-cache-dir redis celery
WORKDIR /data
CMD ["celery"]
docker compose组合管理容器
version: '3'
services:
redis:
image: redis:alpine
ports:
- 6379:6379
worker:
build: ./
entrypoint: celery
command: -A tasks worker -l info -E
user: nobody
volumes:
- ./:/data
depends_on:
- redis
终端进入项目目录,使用docker compose up
即可启动celery和redis服务,另开一个终端,使用docker exec -it celery容器名称 python
命令进入容器并运行Python,调用add异步函数测试:
from tasks import add
add.delay(3,3)
定时任务需要启用celery beat,定时发送到任务队列。打开一个终端,使用docker exec -it celery容器名称 celery -A tasks beat
命令使容器启动celery beat
常见错误
- docker-compose.yml中
build: ./
时,如果Dockerfile中使用pip安装,可能出现Failed to establish a new connection: [Errno -3] Try again')':
错误。
将docker-compose.yml中build: ./
修改为:
build:
context: ./
network: host
资源
官网
- Celery 官网: https://docs.celeryproject.org/
- Celery 文档: https://docs.celeryproject.org/
- Celery 下载:https://pypi.org/project/celery/
- Celery 源代码:https://github.com/celery/celery
相关教程
- 极客学院wiki:Python 之旅 - Celery
- celerycn.io:Celery 中文手册
- 在 Flask 中使用 Celery
- Miguel Grinberg:Using Celery With Flask