Celery:修订间差异
无编辑摘要 |
(→Docker) |
||
(未显示同一用户的12个中间版本) | |||
第1行: | 第1行: | ||
Celery是基于[[Python]] | Celery是基于[[Python]]的一个开源的分布式实时任务队列工具,同时也支持任务调度。 | ||
==简介== | ==简介== | ||
===时间轴=== | ===时间轴=== | ||
* | *2009年04月24日,Ask Solem 发布Celery 0.1.0 | ||
* | *2010年02月10日,发布Celery 1.0.0 | ||
* | *2016年11月04日,发布Celery 4.0.0。配置文件引入小写字母设置和重命名了一些前缀。 | ||
* | *2020年09月24日,发布Celery 5.0.0 | ||
* | *2021年06月18日,发布Celery 5.1 | ||
{{了解更多 | {{了解更多 | ||
第16行: | 第16行: | ||
pip install -U Celery | pip install -U Celery | ||
== | ==入门== | ||
=== | ===基本架构=== | ||
Celery架构是一种生产者消费者模型,生产者(调用异步任务或定时任务)将任务发送到消息中间件([[Redis]]或[[RabbitMQ]]等),消费者(任务执行单元)从消息中间件读取执行。 | |||
{{#drawio:celery架构}} | |||
===基本概念=== | |||
{| class="wikitable" style="width: 100%; | |||
! 名称 | |||
! 描述 | |||
|- | |||
| | |||
| | |||
|- | |||
| | |||
| | |||
|} | |||
===示例 === | |||
== 异步任务 == | |||
== 定时任务 == | |||
Celery也支持定时任务,即周期性任务(Periodic Tasks)。celery beat定期发布任务,然后由celery worker执行。 | |||
{{了解更多 | |||
|[https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html Celery 文档:周期性任务] | |||
}} | |||
=== 定时 === | |||
<code>schedule</code>用于设置执行的频率,可以是整数、timedelta、crontab、solar或自定义类型。一个示例: | |||
<syntaxhighlight lang="python"> | |||
from celery.schedules import crontab | |||
beat_schedule = { | |||
# 每个周星期一早上7点30分执行 | |||
'add-every-monday-morning': { | |||
'task': 'tasks.task1', | |||
'schedule': crontab(hour=7, minute=30, day_of_week=1), | |||
'args': (16, 16), | |||
}, | |||
} | |||
</syntaxhighlight> | |||
{| class="wikitable" style="width: 100%; | |||
! 类别 | |||
! 描述 | |||
! 示例 | |||
|- | |||
| 数字 | |||
| 表示秒 | |||
| <code>30.0</code> 每30秒执行一次 | |||
|- | |||
| [https://docs.python.org/dev/library/datetime.html#datetime.timedelta timedelta] | |||
| [[Python]]的timedelta对象,用于表示时间间隔。<br />使用时需要导入timedelta:<code>from datetime import timedelta</code> | |||
| <code>timedelta(seconds=30)</code> 每30秒执行一次 | |||
|- | |||
| [https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html#crontab-schedules crontab] | |||
| 能够方便设置每天或每星期什么时间执行。<br />使用时需要导入crontab:<code>from celery.schedules import crontab</code> | |||
| <code>crontab()</code> 每分钟执行 <br /><code>crontab(minute=0, hour=0)</code> 每天0点0分执行<br /><code>crontab(hour=7, minute=30, day_of_week=1)</code> 每个周星期一早上7点30分执行 <br /><code>crontab(minute=0, hour='*/6')</code> 每六个小时执行一次:午夜、早上 6 点、中午、下午 6 点 <br /><code>crontab(minute=0, hour='0,6,12,18')</code>同前面 | |||
|- | |||
| [https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html#solar-schedules solar] | |||
| 太阳时间表,按照日出、日落、黎明或黄昏来执行。<br />使用时需要导入solar:<code>from celery.schedules import solar</code> <br />格式:<code>solar(event, latitude, longitude)</code> | |||
| <code>'schedule': solar('sunset', -37.81753, 144.96715)</code> 墨尔本日落时执行 | |||
|- | |||
| 自定义 | |||
| | |||
| | |||
|} | |||
=== 启动 === | |||
启动celery beat服务: | |||
celery -A your_proj beat | |||
通过-B,也可在启动celery worker服务时,同时启动celery beat服务。这种简单方便,但只适用于单个worker节点,因此不建议用于生产用途: | |||
celery -A your_proj worker -B | |||
{{了解更多 | |||
|[https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html#starting-the-scheduler Celery 文档:周期性任务 - 启动调度器] | |||
}} | |||
== 配置 == | |||
=== 概览 === | |||
配置一般写在单独的文件中。如果使用默认加载器,文件名称需要为celeryconfig.py。Celery 4.0 版引入了小写设置和重命名一些前缀,旧的配置命令将在Celery 6.0不可用。一个简单的配置文件: | |||
<syntaxhighlight lang="python"> | |||
# celery_task为项目目录,包含__init__.py、celeryconfig.py、task1.py、task2.py | |||
# celery_task | |||
# ├── __init__.py | |||
# ├── celeryconfig.py | |||
# ├── task1.py | |||
# └── task2.py | |||
# 以下为celeryconfig.py文件内容: | |||
# 任务队列设置 | |||
broker_url = 'redis://127.0.0.1:6379' | |||
# 任务转态和结果存储设置 | |||
result_backend = 'redis://127.0.0.1:6379/0' | |||
# 指定时区,默认是 UTC | |||
timezone = 'Asia/Shanghai' | |||
# 导入任务模块 | |||
imports = ( | |||
'celery_task.task1', | |||
'celery_task.task2' | |||
) | |||
</syntaxhighlight> | |||
{{了解更多 | |||
|[https://docs.celeryproject.org/en/stable/userguide/configuration.html Celery 文档:配置和默认值] | |||
}} | |||
=== Broker === | |||
{| class="wikitable" style="width: 100%; | {| class="wikitable" style="width: 100%; | ||
! 名称 | ! 名称 | ||
! 描述 | ! 描述 | ||
! 示例 | |||
|- | |- | ||
| [https://docs.celeryproject.org/en/stable/userguide/configuration.html#broker-url broker_url] | |||
| 默认消息队列URL。格式:<br /><code>transport://userid:password@hostname:port/virtual_host</code> <br />transport默认为<code>amqp://</code>,其他可选<code>redis://</code>、<code>sqs://</code>和<code>qpid://</code>。 | |||
| <code><nowiki>broker_url = 'redis://127.0.0.1:6379'</nowiki></code>使用redis作为消息队列。 | |||
|- | |||
| | |||
| | | | ||
| | | | ||
|} | |||
=== 日期时间 === | |||
{| class="wikitable" style="width: 100%; | |||
! 名称 | |||
! 描述 | |||
! 示例 | |||
|- | |- | ||
| enable_utc | |||
| | | | ||
| | |||
|- | |||
| [https://docs.celeryproject.org/en/stable/userguide/configuration.html#timezone timezone] | |||
| 时区,默认"UTC" | |||
| | | | ||
|} | |} | ||
=== 结果存储 === | |||
== 监控和管理 == | |||
=== 概览 === | |||
Celery几个常用监控管理工具: | |||
{| class="wikitable" style="width: 100%; | |||
! 名称 | |||
! 描述 | |||
|- | |||
| 命令 | |||
| | |||
|- | |||
| [https://flower.readthedocs.io/ Flower] | |||
| Flower是Celery的实时网络监控和管理工具。 <br /><br />使用:<br /><syntaxhighlight lang="bash"> | |||
# 安装flower | |||
pip install flower | |||
# 为celery项目启动一个网络服务器,默认端口5555,http://localhost:5555/ | |||
celery -A celery项目 flower | |||
# 浏览器访问flower,http://localhost:5555/ | |||
</syntaxhighlight> | |||
|- | |||
| celery events | |||
| 一个简单监视管理工具,在 Celery 2.0 版本新增。可以显示任务和worker的历史,以及查看任务的结果和追溯,它还支持一些管理命令,如限制速度和关闭worker。 <br /><br />使用:<br /><code>celery -A celery项目 events</code> | |||
|} | |||
{{了解更多 | |||
|[https://docs.celeryproject.org/en/stable/userguide/monitoring.html Celery 文档:监控和管理] | |||
}} | |||
===Flower=== | |||
====docker==== | |||
使用docker compose可以快速搭建Flower测试环境,[https://github.com/mher/flower/blob/master/docker-compose.yml flower]官方有配置好的Celery + Flower + [[Redis]] + [[prometheus]] + [[grafana]]测试环境。 以下配置Celery + Flower + [[Redis]]环境。 | |||
项目结构: | |||
<syntaxhighlight lang="bash" > | |||
examples | |||
├── docker-compose.yml | |||
├── Dockerfile | |||
└── examples | |||
├── celeryconfig.py | |||
└── tasks.py | |||
</syntaxhighlight> | |||
{{了解更多 | |||
|[https://github.com/mher/flower/blob/master/docker-compose.yml Github.com/mher/flower/:docker-compose.yml] | |||
}} | |||
==Docker== | |||
===celery=== | |||
使用docker可以快速搭建环境,如搭建一个简单测试环境celery + [[redis]],项目目录如下: | |||
<syntaxhighlight lang="bash" > | |||
example | |||
├── docker-compose.yml | |||
├── Dockerfile | |||
└── tasks.py | |||
</syntaxhighlight> | |||
tasks.py用于配置Celery,注册异步函数 | |||
<syntaxhighlight lang="python" > | |||
from celery import Celery | |||
app = Celery('tasks', broker='redis://redis:6379/0', backend='redis://redis:6379/0' ) | |||
# 设置定时任务,每隔30秒执行一次add任务 | |||
app.conf.beat_schedule = { | |||
'add-every-30-seconds': { | |||
'task': 'tasks.add', | |||
'schedule': 30.0, | |||
'args': (16, 16) | |||
}, | |||
} | |||
# 注册异步任务 | |||
@app.task | |||
def add(x, y): | |||
return x + y | |||
</syntaxhighlight> | |||
Dockerfile是用于构建celery镜像,redis镜像直接使用官方。 | |||
<syntaxhighlight lang="Dockerfile" > | |||
FROM python:alpine | |||
# 安装celery和redis客户端 | |||
RUN pip install --no-cache-dir redis celery | |||
WORKDIR /data | |||
CMD ["celery"] | |||
</syntaxhighlight> | |||
docker compose组合管理容器 | |||
<syntaxhighlight lang="docker" > | |||
version: '3' | |||
services: | |||
redis: | |||
image: redis:alpine | |||
ports: | |||
- 6379:6379 | |||
worker: | |||
build: ./ | |||
entrypoint: celery | |||
command: -A tasks worker -l info -E | |||
user: nobody | |||
volumes: | |||
- ./:/data | |||
depends_on: | |||
- redis | |||
</syntaxhighlight> | |||
终端进入项目目录,使用<code>docker compose up</code>即可启动celery和redis服务,另开一个终端,使用<code>docker exec -it celery容器名称 python</code>命令进入容器并运行Python,调用add异步函数测试: | |||
<syntaxhighlight lang="python" > | |||
from tasks import add | |||
add.delay(3,3) | |||
</syntaxhighlight> | |||
定时任务需要启用celery beat,定时发送到任务队列。打开一个终端,使用<code>docker exec -it celery容器名称 celery -A tasks beat</code>命令使容器启动celery beat | |||
===常见错误=== | |||
* docker-compose.yml中<code>build: ./ </code>时,如果Dockerfile中使用pip安装,可能出现<code>Failed to establish a new connection: [Errno -3] Try again')':</code>错误。 | |||
将docker-compose.yml中<code>build: ./ </code>修改为:<syntaxhighlight lang="docker" > | |||
build: | |||
context: ./ | |||
network: host | |||
</syntaxhighlight> | |||
==资源== | ==资源== |
2022年12月17日 (六) 17:56的最新版本
Celery是基于Python的一个开源的分布式实时任务队列工具,同时也支持任务调度。
简介
时间轴
- 2009年04月24日,Ask Solem 发布Celery 0.1.0
- 2010年02月10日,发布Celery 1.0.0
- 2016年11月04日,发布Celery 4.0.0。配置文件引入小写字母设置和重命名了一些前缀。
- 2020年09月24日,发布Celery 5.0.0
- 2021年06月18日,发布Celery 5.1
了解更多 >> Celery 文档:历史
安装
使用pip安装:
pip install -U Celery
入门
基本架构
Celery架构是一种生产者消费者模型,生产者(调用异步任务或定时任务)将任务发送到消息中间件(Redis或RabbitMQ等),消费者(任务执行单元)从消息中间件读取执行。
基本概念
名称 | 描述 |
---|---|
示例
异步任务
定时任务
Celery也支持定时任务,即周期性任务(Periodic Tasks)。celery beat定期发布任务,然后由celery worker执行。
了解更多 >> Celery 文档:周期性任务
定时
schedule
用于设置执行的频率,可以是整数、timedelta、crontab、solar或自定义类型。一个示例:
from celery.schedules import crontab
beat_schedule = {
# 每个周星期一早上7点30分执行
'add-every-monday-morning': {
'task': 'tasks.task1',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}
类别 | 描述 | 示例 |
---|---|---|
数字 | 表示秒 | 30.0 每30秒执行一次
|
timedelta | Python的timedelta对象,用于表示时间间隔。 使用时需要导入timedelta: from datetime import timedelta
|
timedelta(seconds=30) 每30秒执行一次
|
crontab | 能够方便设置每天或每星期什么时间执行。 使用时需要导入crontab: from celery.schedules import crontab
|
crontab() 每分钟执行 crontab(minute=0, hour=0) 每天0点0分执行crontab(hour=7, minute=30, day_of_week=1) 每个周星期一早上7点30分执行 crontab(minute=0, hour='*/6') 每六个小时执行一次:午夜、早上 6 点、中午、下午 6 点 crontab(minute=0, hour='0,6,12,18') 同前面
|
solar | 太阳时间表,按照日出、日落、黎明或黄昏来执行。 使用时需要导入solar: from celery.schedules import solar 格式: solar(event, latitude, longitude)
|
'schedule': solar('sunset', -37.81753, 144.96715) 墨尔本日落时执行
|
自定义 |
启动
启动celery beat服务:
celery -A your_proj beat
通过-B,也可在启动celery worker服务时,同时启动celery beat服务。这种简单方便,但只适用于单个worker节点,因此不建议用于生产用途:
celery -A your_proj worker -B
了解更多 >> Celery 文档:周期性任务 - 启动调度器
配置
概览
配置一般写在单独的文件中。如果使用默认加载器,文件名称需要为celeryconfig.py。Celery 4.0 版引入了小写设置和重命名一些前缀,旧的配置命令将在Celery 6.0不可用。一个简单的配置文件:
# celery_task为项目目录,包含__init__.py、celeryconfig.py、task1.py、task2.py
# celery_task
# ├── __init__.py
# ├── celeryconfig.py
# ├── task1.py
# └── task2.py
# 以下为celeryconfig.py文件内容:
# 任务队列设置
broker_url = 'redis://127.0.0.1:6379'
# 任务转态和结果存储设置
result_backend = 'redis://127.0.0.1:6379/0'
# 指定时区,默认是 UTC
timezone = 'Asia/Shanghai'
# 导入任务模块
imports = (
'celery_task.task1',
'celery_task.task2'
)
了解更多 >> Celery 文档:配置和默认值
Broker
名称 | 描述 | 示例 |
---|---|---|
broker_url | 默认消息队列URL。格式:transport://userid:password@hostname:port/virtual_host transport默认为 amqp:// ,其他可选redis:// 、sqs:// 和qpid:// 。
|
broker_url = 'redis://127.0.0.1:6379' 使用redis作为消息队列。
|
日期时间
名称 | 描述 | 示例 |
---|---|---|
enable_utc | ||
timezone | 时区,默认"UTC" |
结果存储
监控和管理
概览
Celery几个常用监控管理工具:
名称 | 描述 |
---|---|
命令 | |
Flower | Flower是Celery的实时网络监控和管理工具。 使用: # 安装flower
pip install flower
# 为celery项目启动一个网络服务器,默认端口5555,http://localhost:5555/
celery -A celery项目 flower
# 浏览器访问flower,http://localhost:5555/
|
celery events | 一个简单监视管理工具,在 Celery 2.0 版本新增。可以显示任务和worker的历史,以及查看任务的结果和追溯,它还支持一些管理命令,如限制速度和关闭worker。 使用: celery -A celery项目 events
|
了解更多 >> Celery 文档:监控和管理
Flower
docker
使用docker compose可以快速搭建Flower测试环境,flower官方有配置好的Celery + Flower + Redis + prometheus + grafana测试环境。 以下配置Celery + Flower + Redis环境。
项目结构:
examples
├── docker-compose.yml
├── Dockerfile
└── examples
├── celeryconfig.py
└── tasks.py
Docker
celery
使用docker可以快速搭建环境,如搭建一个简单测试环境celery + redis,项目目录如下:
example
├── docker-compose.yml
├── Dockerfile
└── tasks.py
tasks.py用于配置Celery,注册异步函数
from celery import Celery
app = Celery('tasks', broker='redis://redis:6379/0', backend='redis://redis:6379/0' )
# 设置定时任务,每隔30秒执行一次add任务
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}
# 注册异步任务
@app.task
def add(x, y):
return x + y
Dockerfile是用于构建celery镜像,redis镜像直接使用官方。
FROM python:alpine
# 安装celery和redis客户端
RUN pip install --no-cache-dir redis celery
WORKDIR /data
CMD ["celery"]
docker compose组合管理容器
version: '3'
services:
redis:
image: redis:alpine
ports:
- 6379:6379
worker:
build: ./
entrypoint: celery
command: -A tasks worker -l info -E
user: nobody
volumes:
- ./:/data
depends_on:
- redis
终端进入项目目录,使用docker compose up
即可启动celery和redis服务,另开一个终端,使用docker exec -it celery容器名称 python
命令进入容器并运行Python,调用add异步函数测试:
from tasks import add
add.delay(3,3)
定时任务需要启用celery beat,定时发送到任务队列。打开一个终端,使用docker exec -it celery容器名称 celery -A tasks beat
命令使容器启动celery beat
常见错误
- docker-compose.yml中
build: ./
时,如果Dockerfile中使用pip安装,可能出现Failed to establish a new connection: [Errno -3] Try again')':
错误。
将docker-compose.yml中build: ./
修改为:
build:
context: ./
network: host
资源
官网
- Celery 官网: https://docs.celeryproject.org/
- Celery 文档: https://docs.celeryproject.org/
- Celery 下载:https://pypi.org/project/celery/
- Celery 源代码:https://github.com/celery/celery
相关教程
- 极客学院wiki:Python 之旅 - Celery
- celerycn.io:Celery 中文手册
- 在 Flask 中使用 Celery
- Miguel Grinberg:Using Celery With Flask