Celery

Celery是基于Python的一个开源的分布式实时任务队列工具,同时也支持任务调度。

简介

时间轴

  • 2009年04月24日,Ask Solem 发布Celery 0.1.0
  • 2010年02月10日,发布Celery 1.0.0
  • 2016年11月04日,发布Celery 4.0.0。配置文件引入小写字母设置和重命名了一些前缀。
  • 2020年09月24日,发布Celery 5.0.0
  • 2021年06月18日,发布Celery 5.1

了解更多 >> Celery 文档:历史


安装

使用pip安装:

pip install -U Celery

入门

基本架构

Celery架构是一种生产者消费者模型,生产者(调用异步任务或定时任务)将任务发送到消息中间件(RedisRabbitMQ等),消费者(任务执行单元)从消息中间件读取执行。

drawio: celery架构

基本概念

名称 描述

示例

异步任务

定时任务

Celery也支持定时任务,即周期性任务(Periodic Tasks)。celery beat定期发布任务,然后由celery worker执行。

了解更多 >> Celery 文档:周期性任务


定时

schedule用于设置执行的频率,可以是整数、timedelta、crontab、solar或自定义类型。一个示例:

from celery.schedules import crontab

beat_schedule = {
    # 每个周星期一早上7点30分执行 
    'add-every-monday-morning': {
        'task': 'tasks.task1',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),  
    },
}
类别 描述 示例
数字 表示秒 30.0 每30秒执行一次
timedelta Python的timedelta对象,用于表示时间间隔。
使用时需要导入timedelta:from datetime import timedelta
timedelta(seconds=30) 每30秒执行一次
crontab 能够方便设置每天或每星期什么时间执行。
使用时需要导入crontab:from celery.schedules import crontab
crontab() 每分钟执行
crontab(minute=0, hour=0) 每天0点0分执行
crontab(hour=7, minute=30, day_of_week=1) 每个周星期一早上7点30分执行
crontab(minute=0, hour='*/6') 每六个小时执行一次:午夜、早上 6 点、中午、下午 6 点
crontab(minute=0, hour='0,6,12,18')同前面
solar 太阳时间表,按照日出、日落、黎明或黄昏来执行。
使用时需要导入solar:from celery.schedules import solar
格式:solar(event, latitude, longitude)
'schedule': solar('sunset', -37.81753, 144.96715) 墨尔本日落时执行
自定义

启动

启动celery beat服务:

celery -A your_proj beat

通过-B,也可在启动celery worker服务时,同时启动celery beat服务。这种简单方便,但只适用于单个worker节点,因此不建议用于生产用途:

celery -A your_proj worker -B

了解更多 >> Celery 文档:周期性任务 - 启动调度器


配置

概览

配置一般写在单独的文件中。如果使用默认加载器,文件名称需要为celeryconfig.py。Celery 4.0 版引入了小写设置和重命名一些前缀,旧的配置命令将在Celery 6.0不可用。一个简单的配置文件:

# celery_task为项目目录,包含__init__.py、celeryconfig.py、task1.py、task2.py
# celery_task
# ├── __init__.py
# ├── celeryconfig.py
# ├── task1.py
# └── task2.py
# 以下为celeryconfig.py文件内容:

# 任务队列设置
broker_url = 'redis://127.0.0.1:6379'

# 任务转态和结果存储设置
result_backend = 'redis://127.0.0.1:6379/0'

# 指定时区,默认是 UTC
timezone = 'Asia/Shanghai'

# 导入任务模块
imports = (
    'celery_task.task1',
    'celery_task.task2'
    )

了解更多 >> Celery 文档:配置和默认值



Broker

名称 描述 示例
broker_url 默认消息队列URL。格式:
transport://userid:password@hostname:port/virtual_host
transport默认为amqp://,其他可选redis://sqs://qpid://
broker_url = 'redis://127.0.0.1:6379'使用redis作为消息队列。

日期时间

名称 描述 示例
enable_utc
timezone 时区,默认"UTC"

结果存储

监控和管理

概览

Celery几个常用监控管理工具:

名称 描述
命令
Flower Flower是Celery的实时网络监控和管理工具。

使用:
# 安装flower
pip install flower

# 为celery项目启动一个网络服务器,默认端口5555,http://localhost:5555/
celery -A celery项目 flower

# 浏览器访问flower,http://localhost:5555/
celery events 一个简单监视管理工具,在 Celery 2.0 版本新增。可以显示任务和worker的历史,以及查看任务的结果和追溯,它还支持一些管理命令,如限制速度和关闭worker。

使用:
celery -A celery项目 events

了解更多 >> Celery 文档:监控和管理


Flower

docker

使用docker compose可以快速搭建Flower测试环境,flower官方有配置好的Celery + Flower + Redis + prometheus + grafana测试环境。 以下配置Celery + Flower + Redis环境。

项目结构:

examples
├── docker-compose.yml
├── Dockerfile
└── examples
    ├── celeryconfig.py
    └── tasks.py

了解更多 >> Github.com/mher/flower/:docker-compose.yml


Docker

celery

使用docker可以快速搭建环境,如搭建一个简单测试环境celery + redis,项目目录如下:

example                        
├── docker-compose.yml     
├── Dockerfile
└── tasks.py

tasks.py用于配置Celery,注册异步函数

from celery import Celery

app = Celery('tasks', broker='redis://redis:6379/0', backend='redis://redis:6379/0' )

# 设置定时任务,每隔30秒执行一次add任务
app.conf.beat_schedule = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': 30.0,
        'args': (16, 16)
    },
}

# 注册异步任务
@app.task
def add(x, y):
    return x + y

Dockerfile是用于构建celery镜像,redis镜像直接使用官方。

FROM python:alpine

# 安装celery和redis客户端
RUN pip install --no-cache-dir redis celery

WORKDIR /data

CMD ["celery"]

docker compose组合管理容器

version: '3'
services:

  redis:
    image: redis:alpine
    ports:
      - 6379:6379
      
  worker:
    build: ./
    entrypoint: celery
    command: -A tasks worker -l info -E
    user: nobody
    volumes:
      - ./:/data
    depends_on:
      - redis

终端进入项目目录,使用docker compose up即可启动celery和redis服务,另开一个终端,使用docker exec -it celery容器名称 python命令进入容器并运行Python,调用add异步函数测试:

from tasks import add
add.delay(3,3)

定时任务需要启用celery beat,定时发送到任务队列。打开一个终端,使用docker exec -it celery容器名称 celery -A tasks beat命令使容器启动celery beat

常见错误

  • docker-compose.yml中build: ./ 时,如果Dockerfile中使用pip安装,可能出现Failed to establish a new connection: [Errno -3] Try again')':错误。

将docker-compose.yml中build: ./ 修改为:

    build: 
        context: ./
        network: host

资源

官网

相关教程

相关文章