Celery分布式任务队列
一、Celery介绍和使用:
Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子:
- 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情。
- 你想做一个定时任务,比如每天检测一下你们所有客户的资料,如果发现今天 是客户的生日,就给他发个短信祝福
celery图示:Celery有以下优点:
文章图片
image.png
- 简单:一旦熟悉了celery的工作流程后,配置和使用还是比较简单的
- 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
- 快速:一个单进程的celery每分钟可处理上百万个任务
- 灵活: 几乎celery的各个组件都可以被扩展及自定制
二、基于redis实现的Celery
文章图片
image.png
user:用户程序,用于告知celery去执行一个任务。
broker: 存放任务(依赖RabbitMQ或Redis,进行存储)
worker:执行任务
- 本机安装redis
brew install redis
- 安装Celery:
pip3 install celery
- 后台启动redis服务
redis-server
- 创建一个celery对象和任务
创建一个任务文件就叫tasks.py
from celery import Celeryapp = Celery('tasks',
broker='redis://localhost',
backend='redis://localhost')@app.task
def my_task(x, y):
print("running...", x, y)
return x + y
- 创建user.py用户程序
from tasks import my_task# 立即告知celery去执行my_task任务,并传入两个参数
result = my_task.delay(4, 4)
print(result.id)
- 启动Celery 创建Worker来开始监听并执行任务(要在项目目录里执行)
celery -A tasks worker --loglevel=info
在windows是不支持这个命令的,要安装 pip3 install eventle,然后执行:
celery -A tasks worker --loglevel=info -P eventlet
文章图片
屏幕快照 2018-11-30 上午1.08.54.png
- 执行 user.py ,创建一个任务并获取任务ID:
python3 user.py
文章图片
屏幕快照 2018-11-30 上午1.22.00.png
文章图片
屏幕快照 2018-11-30 上午1.19.29.png
- 查看任务执行情况(注意:代码中的id为执行任务时返回的id值)
from celery.result import AsyncResult
from tasks import appasync = AsyncResult(id="3dd9081b-e62f-4085-84f4-36bba3255df6", app=app)if async.successful():
result = async.get()
print(result)
# result.forget() # 将结果删除
elif async.failed():
print('执行失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')
是不是还是很懵逼,感觉不知道怎么用在实际的业务中。请往下看,通过Flask实现一个抢购商品的模拟示例,你就领略了celery的强大之处!!!步骤一:首先创建一个celery_task.py文件用来创建celery对象和task任务:
import time
import random
from celery import Celery# 创建celery对象
app = Celery('tasks', broker='redis://127.0.0.1:6379',
backend='redis://127.0.0.1:6379')# 创建任务
@app.task
def create_order(gid):
time.sleep(10)
v = random.randint(1,4)
if v == 2:
return '抢购成功'
else:
return '抢购失败'
步骤二:创建一个manage.py文件用来创建一个flask程序和相应的逻辑:
from flask import Flask, render_template, request
from celery.result import AsyncResult # 异步获取结果
from celery_task import create_order # 导入任务
from celery_task import app as celery_app # 导入celery对象#实例化flask对象
app = Flask(__name__)GOODS_LIST = [
{'id': 1, 'title': '小米手机'},
{'id': 2, 'title': '小米手环'},
{'id': 3, 'title': '小米电视'},
]@app.route('/goods')
def goods():
return render_template('goods.html', gds=GOODS_LIST)@app.route('/buy')
def buy():
gid = request.args.get('gid') # 获取前端用户要购买的商品id
result = create_order.delay(gid) #执行抢购任务
return render_template('tips.html', task_id=result.id)@app.route('/check')
def check():
task_id = request.args.get('task') #获得参数
async = AsyncResult(id=task_id, app=celery_app)#查看任务结果
if async.successful():
result = async.get()# 获取任务结果
return result
else:
return '还在排队等待中'if __name__ == '__main__':
app.run()
步骤三:启动Celery 创建Worker来开始监听并执行任务(要在项目目录里执行)
celery -A celery_task worker --loglevel=info
步骤四:启动flask程序,访问
http://127.0.0.1:5000/goods
步骤五:点击购买,实际上就是用户创建了一个任务,并返回一个任务ID,任务被丢进了redis队列中,等待执行任务的worker就会自动执行队列中的任务,请看redis中的数据:
文章图片
屏幕快照 2018-12-01 下午8.46.31.png
步骤六:点击
文章图片
屏幕快照 2018-12-01 下午8.53.20.png
文章图片
屏幕快照 2018-12-01 下午8.55.35.png
点我
,实际上是通过任务ID来查看任务执行的情况
【Celery分布式任务队列】这样就使用celery简单实现了商品抢购的业务示例,有没有清楚一些!!!
文章图片
屏幕快照 2018-12-01 下午8.58.02.png
从这张截图的URL参数可以看出任务ID和redis中的任务ID是一样的,末尾都是22c8
三、Celery的定时任务 celery支持定时任务,设定好任务的执行时间,celery就会定时自动帮你执行, 这个定时任务模块叫
celery beat
Celery 中启动定时任务有两种方式,(1)在程序中指定(2)在配置文件中指定
1、在程序中指定 写一个脚本periodic_task.py文件:
from celery import Celery
from celery.schedules import crontabapp = Celery("tasks",
broker='redis://localhost',# 消息代理
backend='redis://localhost',# 结果存储
)@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')# Calls test('world') every 30 seconds
sender.add_periodic_task(30.0, test.s('world'), expires=10)# Executes every Monday morning at 7:30 a.m.
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
test.s('Happy Mondays!'),
)@app.task
def test(arg):
print(arg)
add_periodic_task 会添加一条定时任务
任务添加好了,需要让celery单独启动一个进程来定时发起这些任务, 注意, 这里是发起任务,不是执行,这个进程只会不断的去检查你的任务计划, 每发现有任务需要执行了,就发起一个任务调用消息,交给celery worker去执行
- 启动任务调度器 celery beat(cd 到文件目录)
celery -A periodic_task beat
文章图片
屏幕快照 2018-12-01 下午10.23.14.png
此时还差一步,就是还需要启动一个worker,负责执行celery beat发起的任务
- 启动celery worker来执行任务(cd 到文件目录)
celery -A periodic_task worker
2、在配置文件中指定
文章图片
屏幕快照 2018-12-01 下午10.25.35.png
此时观察worker的输出,是不是每隔一小会,就会执行一次定时任务!
- 首先看一下文件书写的方式:
celery_task/_init_.py
文章图片
屏幕快照 2018-12-02 上午1.28.44.png
# 拒绝隐式引入,如果celery.py和celery模块名字一样,避免冲突,需要加上这条语句
# 该代码中,名字是不一样的,最好也要不一样
from __future__ import absolute_import
from celery import Celeryapp = Celery('tasks',
broker='redis://localhost',# 消息代理
backend='redis://localhost',# 结果存储
)app.config_from_object('celery_task.config')
config.py
from __future__ import absolute_import
from celery.schedules import crontab
from datetime import timedelta# 使用redis存储任务队列
broker_url = 'redis://127.0.0.1:6379/7'
# 使用redis存储结果
result_backend = 'redis://127.0.0.1:6379/8'task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
# 时区设置
timezone = 'Asia/Shanghai'
# celery默认开启自己的日志
# False表示不关闭
worker_hijack_root_logger = False
# 存储结果过期时间,过期后自动删除
# 单位为秒
result_expires = 60 * 60 * 24# 导入任务所在文件
imports = [
'celery_task.tasks.task',
]# 需要执行任务的配置
beat_schedule = {
'test1': {
# 具体需要执行的函数
# 该函数必须要使用@app.task装饰
'task': 'celery_task.tasks.task.remove',
# 定时时间
# 每分钟执行一次,不能为小数
'schedule': crontab(minute='*/1'),
# 或者这么写,每小时执行一次
# "schedule": crontab(minute=0, hour="*/1")
# 执行的函数需要的参数
'args': ("hello",)
},
}
task.py
from __future__ import absolute_import, unicode_literals
from celery_task import app@app.task
def remove(path):print(path)
# to do anything
return True
- 启动任务调度器 celery beat(cd 到文件目录)
celery -A celery_task beat
文章图片
屏幕快照 2018-12-02 上午1.33.14.png
- 启动celery worker来执行任务(cd 到文件目录)
celery -A celery_task worker -l info
Django中使用celery 定时任务
文章图片
屏幕快照 2018-12-02 上午1.34.10.png
推荐阅读
- 多线程NSOperation
- 深入浅出谈一下有关分布式消息技术(Kafka)
- linux定时任务contab
- 242为什么不断切换任务会更容易累()
- KubeDL HostNetwork(加速分布式训练通信效率)
- IOST任务教程
- 时间管理的任务模型
- 实操Redission|实操Redission 分布式服务
- 随便写写,完成任务?
- 分布式|《Python3网络爬虫开发实战(第二版)》内容介绍