Celery:修订间差异

无编辑摘要
 
(未显示同一用户的7个中间版本)
第64行: 第64行:
! 示例
! 示例
|-
|-
| 整数
| 数字
| 表示秒
| 表示秒
| <code>30.0</code> 每30秒运行一次
| <code>30.0</code> 每30秒执行一次
|-  
|-  
| [https://docs.python.org/dev/library/datetime.html#datetime.timedelta timedelta]
| [https://docs.python.org/dev/library/datetime.html#datetime.timedelta timedelta]
| Pythont的timedelta对象,<br />使用时需要导入timedelta:<code>from datetime import timedelta</code>
| [[Python]]的timedelta对象,用于表示时间间隔。<br />使用时需要导入timedelta:<code>from datetime import timedelta</code>
|  
| <code>timedelta(seconds=30)</code> 每30秒执行一次
|-  
|-  
| [https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html#crontab-schedules crontab]
| [https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html#crontab-schedules crontab]
| 能够方便设置每天或每星期什么时间执行。<br />使用时需要导入crontab:<code>from celery.schedules import crontab</code>
| 能够方便设置每天或每星期什么时间执行。<br />使用时需要导入crontab:<code>from celery.schedules import crontab</code>
| <code>crontab()</code> 每分钟执行 <br /><code>crontab(minute=0, hour=0</code> 每天0点0分执行<br /><code>crontab(hour=7, minute=30, day_of_week=1)</code> 每个周星期一早上7点30分执行  
| <code>crontab()</code> 每分钟执行 <br /><code>crontab(minute=0, hour=0)</code> 每天0点0分执行<br /><code>crontab(hour=7, minute=30, day_of_week=1)</code> 每个周星期一早上7点30分执行 <br /><code>crontab(minute=0, hour='*/6')</code> 每六个小时执行一次:午夜、早上 6 点、中午、下午 6 点 <br /><code>crontab(minute=0, hour='0,6,12,18')</code>同前面
|-  
|-  
| [https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html#solar-schedules solar]
| [https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html#solar-schedules solar]
第101行: 第101行:
<syntaxhighlight lang="python">
<syntaxhighlight lang="python">
# celery_task为项目目录,包含__init__.py、celeryconfig.py、task1.py、task2.py
# celery_task为项目目录,包含__init__.py、celeryconfig.py、task1.py、task2.py
# celery_task
# ├── __init__.py
# ├── celeryconfig.py
# ├── task1.py
# └── task2.py
# 以下为celeryconfig.py文件内容:
# 以下为celeryconfig.py文件内容:


第155行: 第160行:


=== 结果存储 ===
=== 结果存储 ===
== 监控和管理 ==
=== 概览 ===
Celery几个常用监控管理工具:
{| class="wikitable"  style="width: 100%;
! 名称
! 描述
|-
| 命令
|
|-
| [https://flower.readthedocs.io/ Flower]
| Flower是Celery的实时网络监控和管理工具。 <br /><br />使用:<br /><syntaxhighlight lang="bash">
# 安装flower
pip install flower
# 为celery项目启动一个网络服务器,默认端口5555,http://localhost:5555/
celery -A celery项目 flower
# 浏览器访问flower,http://localhost:5555/
</syntaxhighlight>
|-
| celery events
| 一个简单监视管理工具,在 Celery 2.0 版本新增。可以显示任务和worker的历史,以及查看任务的结果和追溯,它还支持一些管理命令,如限制速度和关闭worker。 <br /><br />使用:<br /><code>celery -A celery项目 events</code>
|}
{{了解更多
|[https://docs.celeryproject.org/en/stable/userguide/monitoring.html Celery 文档:监控和管理]
}}
===Flower===
====docker====
使用docker compose可以快速搭建Flower测试环境,[https://github.com/mher/flower/blob/master/docker-compose.yml flower]官方有配置好的Celery + Flower + [[Redis]] + [[prometheus]] + [[grafana]]测试环境。 以下配置Celery + Flower + [[Redis]]环境。
项目结构:
<syntaxhighlight lang="bash" >
examples
├── docker-compose.yml
├── Dockerfile
└── examples
    ├── celeryconfig.py
    └── tasks.py
</syntaxhighlight>
{{了解更多
|[https://github.com/mher/flower/blob/master/docker-compose.yml Github.com/mher/flower/:docker-compose.yml]
}}
==Docker==
===celery===
使用docker可以快速搭建环境,如搭建一个简单测试环境celery + [[redis]],项目目录如下:
<syntaxhighlight lang="bash" >
example                       
├── docker-compose.yml   
├── Dockerfile
└── tasks.py
</syntaxhighlight>
tasks.py用于配置Celery,注册异步函数
<syntaxhighlight lang="python" >
from celery import Celery
app = Celery('tasks', broker='redis://redis:6379/0', backend='redis://redis:6379/0' )
# 设置定时任务,每隔30秒执行一次add任务
app.conf.beat_schedule = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': 30.0,
        'args': (16, 16)
    },
}
# 注册异步任务
@app.task
def add(x, y):
    return x + y
</syntaxhighlight>
Dockerfile是用于构建celery镜像,redis镜像直接使用官方。
<syntaxhighlight lang="Dockerfile" >
FROM python:alpine
# 安装celery和redis客户端
RUN pip install --no-cache-dir redis celery
WORKDIR /data
CMD ["celery"]
</syntaxhighlight>
docker compose组合管理容器
<syntaxhighlight lang="docker" >
version: '3'
services:
  redis:
    image: redis:alpine
    ports:
      - 6379:6379
     
  worker:
    build: ./
    entrypoint: celery
    command: -A tasks worker -l info -E
    user: nobody
    volumes:
      - ./:/data
    depends_on:
      - redis
</syntaxhighlight>
终端进入项目目录,使用<code>docker compose up</code>即可启动celery和redis服务,另开一个终端,使用<code>docker exec -it celery容器名称 python</code>命令进入容器并运行Python,调用add异步函数测试:
<syntaxhighlight lang="python" >
from tasks import add
add.delay(3,3)
</syntaxhighlight>
定时任务需要启用celery beat,定时发送到任务队列。打开一个终端,使用<code>docker exec -it celery容器名称 celery -A tasks beat</code>命令使容器启动celery beat
===常见错误===
* docker-compose.yml中<code>build: ./ </code>时,如果Dockerfile中使用pip安装,可能出现<code>Failed to establish a new connection: [Errno -3] Try again')':</code>错误。
将docker-compose.yml中<code>build: ./ </code>修改为:<syntaxhighlight lang="docker" >
    build:
        context: ./
        network: host
</syntaxhighlight>


==资源==
==资源==

2022年12月17日 (六) 17:56的最新版本

Celery是基于Python的一个开源的分布式实时任务队列工具,同时也支持任务调度。

简介

时间轴

  • 2009年04月24日,Ask Solem 发布Celery 0.1.0
  • 2010年02月10日,发布Celery 1.0.0
  • 2016年11月04日,发布Celery 4.0.0。配置文件引入小写字母设置和重命名了一些前缀。
  • 2020年09月24日,发布Celery 5.0.0
  • 2021年06月18日,发布Celery 5.1

了解更多 >> Celery 文档:历史


安装

使用pip安装:

pip install -U Celery

入门

基本架构

Celery架构是一种生产者消费者模型,生产者(调用异步任务或定时任务)将任务发送到消息中间件(RedisRabbitMQ等),消费者(任务执行单元)从消息中间件读取执行。

drawio: celery架构

基本概念

名称 描述

示例

异步任务

定时任务

Celery也支持定时任务,即周期性任务(Periodic Tasks)。celery beat定期发布任务,然后由celery worker执行。

了解更多 >> Celery 文档:周期性任务


定时

schedule用于设置执行的频率,可以是整数、timedelta、crontab、solar或自定义类型。一个示例:

from celery.schedules import crontab

beat_schedule = {
    # 每个周星期一早上7点30分执行 
    'add-every-monday-morning': {
        'task': 'tasks.task1',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),  
    },
}
类别 描述 示例
数字 表示秒 30.0 每30秒执行一次
timedelta Python的timedelta对象,用于表示时间间隔。
使用时需要导入timedelta:from datetime import timedelta
timedelta(seconds=30) 每30秒执行一次
crontab 能够方便设置每天或每星期什么时间执行。
使用时需要导入crontab:from celery.schedules import crontab
crontab() 每分钟执行
crontab(minute=0, hour=0) 每天0点0分执行
crontab(hour=7, minute=30, day_of_week=1) 每个周星期一早上7点30分执行
crontab(minute=0, hour='*/6') 每六个小时执行一次:午夜、早上 6 点、中午、下午 6 点
crontab(minute=0, hour='0,6,12,18')同前面
solar 太阳时间表,按照日出、日落、黎明或黄昏来执行。
使用时需要导入solar:from celery.schedules import solar
格式:solar(event, latitude, longitude)
'schedule': solar('sunset', -37.81753, 144.96715) 墨尔本日落时执行
自定义

启动

启动celery beat服务:

celery -A your_proj beat

通过-B,也可在启动celery worker服务时,同时启动celery beat服务。这种简单方便,但只适用于单个worker节点,因此不建议用于生产用途:

celery -A your_proj worker -B

了解更多 >> Celery 文档:周期性任务 - 启动调度器


配置

概览

配置一般写在单独的文件中。如果使用默认加载器,文件名称需要为celeryconfig.py。Celery 4.0 版引入了小写设置和重命名一些前缀,旧的配置命令将在Celery 6.0不可用。一个简单的配置文件:

# celery_task为项目目录,包含__init__.py、celeryconfig.py、task1.py、task2.py
# celery_task
# ├── __init__.py
# ├── celeryconfig.py
# ├── task1.py
# └── task2.py
# 以下为celeryconfig.py文件内容:

# 任务队列设置
broker_url = 'redis://127.0.0.1:6379'

# 任务转态和结果存储设置
result_backend = 'redis://127.0.0.1:6379/0'

# 指定时区,默认是 UTC
timezone = 'Asia/Shanghai'

# 导入任务模块
imports = (
    'celery_task.task1',
    'celery_task.task2'
    )

了解更多 >> Celery 文档:配置和默认值



Broker

名称 描述 示例
broker_url 默认消息队列URL。格式:
transport://userid:password@hostname:port/virtual_host
transport默认为amqp://,其他可选redis://sqs://qpid://
broker_url = 'redis://127.0.0.1:6379'使用redis作为消息队列。

日期时间

名称 描述 示例
enable_utc
timezone 时区,默认"UTC"

结果存储

监控和管理

概览

Celery几个常用监控管理工具:

名称 描述
命令
Flower Flower是Celery的实时网络监控和管理工具。

使用:
# 安装flower
pip install flower

# 为celery项目启动一个网络服务器,默认端口5555,http://localhost:5555/
celery -A celery项目 flower

# 浏览器访问flower,http://localhost:5555/
celery events 一个简单监视管理工具,在 Celery 2.0 版本新增。可以显示任务和worker的历史,以及查看任务的结果和追溯,它还支持一些管理命令,如限制速度和关闭worker。

使用:
celery -A celery项目 events

了解更多 >> Celery 文档:监控和管理


Flower

docker

使用docker compose可以快速搭建Flower测试环境,flower官方有配置好的Celery + Flower + Redis + prometheus + grafana测试环境。 以下配置Celery + Flower + Redis环境。

项目结构:

examples
├── docker-compose.yml
├── Dockerfile
└── examples
    ├── celeryconfig.py
    └── tasks.py

了解更多 >> Github.com/mher/flower/:docker-compose.yml


Docker

celery

使用docker可以快速搭建环境,如搭建一个简单测试环境celery + redis,项目目录如下:

example                        
├── docker-compose.yml     
├── Dockerfile
└── tasks.py

tasks.py用于配置Celery,注册异步函数

from celery import Celery

app = Celery('tasks', broker='redis://redis:6379/0', backend='redis://redis:6379/0' )

# 设置定时任务,每隔30秒执行一次add任务
app.conf.beat_schedule = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': 30.0,
        'args': (16, 16)
    },
}

# 注册异步任务
@app.task
def add(x, y):
    return x + y

Dockerfile是用于构建celery镜像,redis镜像直接使用官方。

FROM python:alpine

# 安装celery和redis客户端
RUN pip install --no-cache-dir redis celery

WORKDIR /data

CMD ["celery"]

docker compose组合管理容器

version: '3'
services:

  redis:
    image: redis:alpine
    ports:
      - 6379:6379
      
  worker:
    build: ./
    entrypoint: celery
    command: -A tasks worker -l info -E
    user: nobody
    volumes:
      - ./:/data
    depends_on:
      - redis

终端进入项目目录,使用docker compose up即可启动celery和redis服务,另开一个终端,使用docker exec -it celery容器名称 python命令进入容器并运行Python,调用add异步函数测试:

from tasks import add
add.delay(3,3)

定时任务需要启用celery beat,定时发送到任务队列。打开一个终端,使用docker exec -it celery容器名称 celery -A tasks beat命令使容器启动celery beat

常见错误

  • docker-compose.yml中build: ./ 时,如果Dockerfile中使用pip安装,可能出现Failed to establish a new connection: [Errno -3] Try again')':错误。

将docker-compose.yml中build: ./ 修改为:

    build: 
        context: ./
        network: host

资源

官网

相关教程

相关文章