Celery分布式任务队列

一、Celery介绍和使用: Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子:

  • 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情。
  • 你想做一个定时任务,比如每天检测一下你们所有客户的资料,如果发现今天 是客户的生日,就给他发个短信祝福
Celery 在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis,后面会讲
celery图示:

Celery分布式任务队列
文章图片
image.png
Celery有以下优点:
  • 简单:一旦熟悉了celery的工作流程后,配置和使用还是比较简单的
  • 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
  • 快速:一个单进程的celery每分钟可处理上百万个任务
  • 灵活: 几乎celery的各个组件都可以被扩展及自定制
Celery基本工作流程图:
Celery分布式任务队列
文章图片
image.png
user:用户程序,用于告知celery去执行一个任务。
broker: 存放任务(依赖RabbitMQ或Redis,进行存储)
worker:执行任务
二、基于redis实现的Celery
  • 本机安装redis
    brew install redis
  • 安装Celery:
    pip3 install celery
  • 后台启动redis服务
    redis-server
  • 创建一个celery对象和任务
    创建一个任务文件就叫tasks.py
from celery import Celeryapp = Celery('tasks', broker='redis://localhost', backend='redis://localhost')@app.task def my_task(x, y): print("running...", x, y) return x + y

  • 创建user.py用户程序
from tasks import my_task# 立即告知celery去执行my_task任务,并传入两个参数 result = my_task.delay(4, 4) print(result.id)

  • 启动Celery 创建Worker来开始监听并执行任务(要在项目目录里执行)
    celery -A tasks worker --loglevel=info
    在windows是不支持这个命令的,要安装 pip3 install eventle,然后执行:
    celery -A tasks worker --loglevel=info -P eventlet
Celery分布式任务队列
文章图片
屏幕快照 2018-11-30 上午1.08.54.png
  • 执行 user.py ,创建一个任务并获取任务ID:
    python3 user.py
Celery分布式任务队列
文章图片
屏幕快照 2018-11-30 上午1.22.00.png Celery分布式任务队列
文章图片
屏幕快照 2018-11-30 上午1.19.29.png
  • 查看任务执行情况(注意:代码中的id为执行任务时返回的id值)
from celery.result import AsyncResult from tasks import appasync = AsyncResult(id="3dd9081b-e62f-4085-84f4-36bba3255df6", app=app)if async.successful(): result = async.get() print(result) # result.forget() # 将结果删除 elif async.failed(): print('执行失败') elif async.status == 'PENDING': print('任务等待中被执行') elif async.status == 'RETRY': print('任务异常后正在重试') elif async.status == 'STARTED': print('任务已经开始被执行')

是不是还是很懵逼,感觉不知道怎么用在实际的业务中。请往下看,通过Flask实现一个抢购商品的模拟示例,你就领略了celery的强大之处!!!
步骤一:首先创建一个celery_task.py文件用来创建celery对象和task任务:
import time import random from celery import Celery# 创建celery对象 app = Celery('tasks', broker='redis://127.0.0.1:6379', backend='redis://127.0.0.1:6379')# 创建任务 @app.task def create_order(gid): time.sleep(10) v = random.randint(1,4) if v == 2: return '抢购成功' else: return '抢购失败'

步骤二:创建一个manage.py文件用来创建一个flask程序和相应的逻辑:
from flask import Flask, render_template, request from celery.result import AsyncResult # 异步获取结果 from celery_task import create_order # 导入任务 from celery_task import app as celery_app # 导入celery对象#实例化flask对象 app = Flask(__name__)GOODS_LIST = [ {'id': 1, 'title': '小米手机'}, {'id': 2, 'title': '小米手环'}, {'id': 3, 'title': '小米电视'}, ]@app.route('/goods') def goods(): return render_template('goods.html', gds=GOODS_LIST)@app.route('/buy') def buy(): gid = request.args.get('gid') # 获取前端用户要购买的商品id result = create_order.delay(gid) #执行抢购任务 return render_template('tips.html', task_id=result.id)@app.route('/check') def check(): task_id = request.args.get('task') #获得参数 async = AsyncResult(id=task_id, app=celery_app)#查看任务结果 if async.successful(): result = async.get()# 获取任务结果 return result else: return '还在排队等待中'if __name__ == '__main__': app.run()

步骤三:启动Celery 创建Worker来开始监听并执行任务(要在项目目录里执行) celery -A celery_task worker --loglevel=info
步骤四:启动flask程序,访问 http://127.0.0.1:5000/goods
Celery分布式任务队列
文章图片
屏幕快照 2018-12-01 下午8.46.31.png
步骤五:点击购买,实际上就是用户创建了一个任务,并返回一个任务ID,任务被丢进了redis队列中,等待执行任务的worker就会自动执行队列中的任务,请看redis中的数据:
Celery分布式任务队列
文章图片
屏幕快照 2018-12-01 下午8.53.20.png Celery分布式任务队列
文章图片
屏幕快照 2018-12-01 下午8.55.35.png
步骤六:点击点我,实际上是通过任务ID来查看任务执行的情况

Celery分布式任务队列
文章图片
屏幕快照 2018-12-01 下午8.58.02.png
从这张截图的URL参数可以看出任务ID和redis中的任务ID是一样的,末尾都是 22c8
【Celery分布式任务队列】这样就使用celery简单实现了商品抢购的业务示例,有没有清楚一些!!!
三、Celery的定时任务 celery支持定时任务,设定好任务的执行时间,celery就会定时自动帮你执行, 这个定时任务模块叫celery beat
Celery 中启动定时任务有两种方式,(1)在程序中指定(2)在配置文件中指定
1、在程序中指定 写一个脚本periodic_task.py文件:
from celery import Celery from celery.schedules import crontabapp = Celery("tasks", broker='redis://localhost',# 消息代理 backend='redis://localhost',# 结果存储 )@app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')# Calls test('world') every 30 seconds sender.add_periodic_task(30.0, test.s('world'), expires=10)# Executes every Monday morning at 7:30 a.m. sender.add_periodic_task( crontab(hour=7, minute=30, day_of_week=1), test.s('Happy Mondays!'), )@app.task def test(arg): print(arg)

add_periodic_task 会添加一条定时任务
任务添加好了,需要让celery单独启动一个进程来定时发起这些任务, 注意, 这里是发起任务,不是执行,这个进程只会不断的去检查你的任务计划, 每发现有任务需要执行了,就发起一个任务调用消息,交给celery worker去执行
  • 启动任务调度器 celery beat(cd 到文件目录)
    celery -A periodic_task beat
Celery分布式任务队列
文章图片
屏幕快照 2018-12-01 下午10.23.14.png
此时还差一步,就是还需要启动一个worker,负责执行celery beat发起的任务
  • 启动celery worker来执行任务(cd 到文件目录)
    celery -A periodic_task worker
Celery分布式任务队列
文章图片
屏幕快照 2018-12-01 下午10.25.35.png
此时观察worker的输出,是不是每隔一小会,就会执行一次定时任务!
2、在配置文件中指定
  • 首先看一下文件书写的方式:
Celery分布式任务队列
文章图片
屏幕快照 2018-12-02 上午1.28.44.png
celery_task/_init_.py
# 拒绝隐式引入,如果celery.py和celery模块名字一样,避免冲突,需要加上这条语句 # 该代码中,名字是不一样的,最好也要不一样 from __future__ import absolute_import from celery import Celeryapp = Celery('tasks', broker='redis://localhost',# 消息代理 backend='redis://localhost',# 结果存储 )app.config_from_object('celery_task.config')

config.py
from __future__ import absolute_import from celery.schedules import crontab from datetime import timedelta# 使用redis存储任务队列 broker_url = 'redis://127.0.0.1:6379/7' # 使用redis存储结果 result_backend = 'redis://127.0.0.1:6379/8'task_serializer = 'json' result_serializer = 'json' accept_content = ['json'] # 时区设置 timezone = 'Asia/Shanghai' # celery默认开启自己的日志 # False表示不关闭 worker_hijack_root_logger = False # 存储结果过期时间,过期后自动删除 # 单位为秒 result_expires = 60 * 60 * 24# 导入任务所在文件 imports = [ 'celery_task.tasks.task', ]# 需要执行任务的配置 beat_schedule = { 'test1': { # 具体需要执行的函数 # 该函数必须要使用@app.task装饰 'task': 'celery_task.tasks.task.remove', # 定时时间 # 每分钟执行一次,不能为小数 'schedule': crontab(minute='*/1'), # 或者这么写,每小时执行一次 # "schedule": crontab(minute=0, hour="*/1") # 执行的函数需要的参数 'args': ("hello",) }, }

task.py
from __future__ import absolute_import, unicode_literals from celery_task import app@app.task def remove(path):print(path) # to do anything return True

  • 启动任务调度器 celery beat(cd 到文件目录)
    celery -A celery_task beat
Celery分布式任务队列
文章图片
屏幕快照 2018-12-02 上午1.33.14.png
  • 启动celery worker来执行任务(cd 到文件目录)
    celery -A celery_task worker -l info
Celery分布式任务队列
文章图片
屏幕快照 2018-12-02 上午1.34.10.png
Django中使用celery 定时任务

    推荐阅读