Celery知识点总结

须知少年凌云志,曾许人间第一流。这篇文章主要讲述Celery知识点总结相关的知识,希望能为你提供帮助。
本文内将主要介绍Celery的相关知识点,其中会涉及到其架构原理、重要功能讲解、相关配置以及使用技巧等。本文适合的阅读人群为在使用编写项目的主语言为python且需要快速实现异步架构的开发者。笔者也会将自己的理解在文中进行阐述,这也算是在和大家交流心得的一个过程。若文中有错误的理解和概念,请大家及时纠正;吸纳大家的建议,对于我来说也是很重要的学习过程之一。
1. 架构Celery是基于Python开发的一个分布式任务队列框架,支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。

Celery知识点总结

文章图片

其中,Celery的任务队列支持使用多种介质来实现,例如Redis, RabbitMQ和Amazon SQS等。通过使用其自带的任务调度器还可快速实现定时任务推送的功能。
2. 任务路由Celery中的任务路由是由Exchange和Queue这两个概念来实现的。
Celery知识点总结

文章图片

Celery可以在一个Broker上面开辟多个Queue,每个Queue绑定指定类型的任务;对应的Worker通过指定的队列获取任务;Exchange用来决定哪些任务发送到哪些Queue中。Celery结合了上述的这些功能从而实现了任务路由。本章节将会对于这两个概念展开介绍。
2.1 Exchange发布的任务首先会经过Exchange。Exchange会根据设定条件将任务路由到不同的Queue中。不同类型的Exchange有着不同的路由规则。Exchange的类型有如下几类:
  1. Direct Exchange
    直接交换,即指定一个任务被哪个队列接收。哪一个Queue绑定了任务中带有的Routing key,那么Exchange就会将任务直接转发给这个Queue。
  2. Topic Exchange
    Topic Exchange会使用通配符来匹配Routing key。Routing key的格式需要为使用“.”(点)符号来分割,通配符支持使用"#"(用于匹配0或1个字符)和"*"(用于匹配多个字符)。每一个Queue都会绑定一个通配公式,当任务中带有的Routing key满足通配公式时,则Exchange会将任务发送到该Queue中。这种类型的Exchange可以满足按需路由任务的需求。
在一个架构中,可以包含多个Exchange,每一个Exchange管理着不同的Queue与Worker(即真正执行Task逻辑的对象)。
2.2 Queue当确定好Exchange的类型后,还需要创建所需的Queue,并将Queue绑定到相应的Exchange上。因此,在定义Queue的同时,还需要定义Queue与Exchange的绑定关系。定义Queue的配置如下所示:
CELERY_QUEUES = Queue(queue01,Exchange(ex01,type=topic), routing_key=*.task.*), Queue(queue02, Exchange(ex01,type=topic), routing_key=*.*.email), Queue(queue03, Exchange(ex01,type=topic), routing_key=*.add)

其中,Exchange(ex01,type=topic)表示定义了一个名为ex01的Exchange,其类型为Topic Exchange。其次,Queue(queue01,Exchange(ex01,type=topic), routing_key=.task.)表示定义了一个名叫queue01的队列,该列队绑定在了Exchange ex01上,并且只有当Routing key满足通配公式.task.时,ex01才会将任务转发到queue01中。
为了系统可靠性,还可以配置默认的Queue,Exchange以及Routing key;以保证任务必定会被处理。其相关的配置项如下:
default_exchange = Exchange(default, type=topic) app.conf.task_default_queue = Lost app.conf.task_default_exchange = default app.conf.task_default_routing_key = lost

上述给出的示例,使用的是编码形式的配置方式。Celery同时还支持使用配置文件的方式进行配置,这里笔者也给出该种方式的Exchange与Queue的定义细节:
# Queues and Route Config task_default_exchange = "tsmp_topic_exchange" task_default_exchange_type = "topic" task_default_queue = "Lost" task_default_routing_key = "lost" task_queues = "Alert": "exchange": "tsmp_topic_exchange", "exchange_type": "topic", "routing_key": "alert" , "Email": "exchange": "tsmp_topic_exchange", "exchange_type": "topic", "routing_key": "email"

使用配置文件的方式进行定义,往往能够满足那些需要统一声明,统一配置的需求场景
3. 任务在Celery中,由于传递的是任务(Task);因此构造Task便成为了Celery的另一大重点。本章节中会介绍Task的一些相关概念以及编写技巧。
3.1 任务注册当Task方法实现后,需要使用app.task()装饰器来装饰Task方法。app.task()装饰器负责在应用任务注册表中注册你的任务。当任务被发送时,没有实际函数的代码被发送,只是将要执行任务对应的方法名称以及参数发送到Broker。当Worker收到消息时,它会在任务注册表中查找任务名称并使用指定参数执行相关方法。因此,任务发布者和Worker都应该有完整的Task源码。如果代码需要更新,则两者的源码都应同步更新。
3.2 自定义Task类在Celery内部,是将Task看作为一个对象。即Celery实际上对Task这一概念进行了抽象
多数任务都有着类似的内部逻辑时,或任务中逻辑过于复杂时;此时可以考虑将Task的内容封装到一个Task类中。之后,将自定义Task类作为app.task()装饰器中base的参数值后,就可以在Task的相应方法中调用自定义Task类中的方法了。例如:
from celery.utils.log import get_task_loggerfrom cdm_celery import celery_config from cdm_celery.celery import app from cdm_celery.task.base.dns import CloudDomainServiceConsumer from factory.route53 import Route53Factorylogger = get_task_logger(__name__)@app.task(base=CloudDomainServiceConsumer, bind=True, retry_backoff=True) def create_route53_record(self, type=, domain=, records=[]): factory = Route53Factory( access_key=celery_config.route53[access_key], secret_access_key=celery_config.route53[secret_access_key], domain=domain ) self.set_factory(factory) self.set_service_name(Route53) self.set_action_type(添加DNS记录)task_result = self.create_record(type, domain, records) # 调用自定义Task类方法return task_result

除了将自身的业务需求封装在自定义Task类中,还可以重写一些celery.Task类本身的方法逻辑。例如通过重写< br/> on_success、on_failure以及on_retry这3个方法可以实现Task的回调接口。例如:
from celery import Taskclass CloudDomainServiceConsumer(Task):def on_success(self, retval, task_id, args, kwargs): """任务执行成功回调方法 """ logger.info(log_msg_to_json_str( msg=执行成功, data=https://www.songbingjia.com/android/args: args, kwargs: kwargs, celery_task_id=task_id ))self.alert(data=retval) # 发送通知邮件def on_failure(self, exc, task_id, args, kwargs, einfo):"""任务执行失败回调方法 """ logger.error(log_msg_to_json_str( msg=执行失败; exc_type:0; exc_details:1; celery_info:2.format( type(exc), exc, einfo, ), data=https://www.songbingjia.com/android/args: args, kwargs: kwargs, celery_task_id=task_id ))# 发送通知邮件 retval = action_type: self.get_action_type(), service: self.get_service_name(), status: 添加失败, query_params: str(exc)self.alert(data=retval)def on_retry(self, exc, task_id, args, kwargs, einfo):"""任务重试回调方法 """ logger.warning(log_msg_to_json_str( msg=重试; exc_type:0; exc_details:1; celery_info:2.format( type(exc), exc, einfo, ), data=https://www.songbingjia.com/android/args: args, kwargs: kwargs, celery_task_id=task_id ))

3.3 幂等性Task的幂等性是需要开发者自己实现的,Celery是无法检测Task逻辑是否是幂等的。为了保证异步数据一致性,往往都需要我们将Task实现为支持幂等性的。
3.4 Task函数之前的章节提到过,如果需要将指定Task注册到Celery队列中,需要使用app.task()装饰器来装饰Task方法。这其实也是将普通函数声明为Celery Task的方法。通过3.2章节的介绍,我们得知Celery将Task抽象为类对象,因此celery.Task类的实例属性和方法是可以在Task方法中使用的。只需要将装饰器方法中bind参数定义为True;之后就可以在Task方法中调用app.Task.request的属性和方法了。
被装饰的Task方法第一个参数必须是self。该self为celery.app.task对象的实例,此时若自定义了Task类,则该self所指的就是自定义Task类的对象实例
3.5 日志Celery使用的也是Python的标准logging模块;因此具体的配置与使用配置logging模块大致相同。不同只是在配置入口时,需要使用相应的装饰器来获取Celery的logger对象。这里介绍一些关于自定义Celery日志的方法。
  1. 关闭celery默认日志
    使用celery.signals.setup_logging。
    import celery

@celery.signals.setup_logging.connect
def on_setup_logging(**kwargs):
pass
2. 自定义celery自身日志(非task日志) 使用装饰器@celery.signals.after_setup_logger.connect装饰自定义日志格式方法。3. 自定义Task日志 使用装饰器@celery.signals.after_setup_task_logger.connect装饰自定义日志格式方法。 ```python import celery@celery.signals.after_setup_task_logger.connect def on_after_setup_task_logger(logger, *args, **kwargs): logger.setLevel(logging.INFO) set_logger_file_handler(os.path.join(BASE_DIR, log/celery/task/info.log), custom_logger=logger, level=logging.INFO) set_logger_file_handler(os.path.join(BASE_DIR, log/celery/task/error.log), custom_logger=logger, level=logging.ERROR) set_logger_file_handler(os.path.join(BASE_DIR, log/celery/task/warning.log), custom_logger=logger, level=logging.WARNING)

对于Celery日志的自定义,笔者有一些实践经验想分享给大家。由于Celery是一个异步任务框架,因此正常情况下会有多个Worker来并行处理任务。由于Python的logging模块对于多进程打印日志支持的不好,因此如果多个Worker往同一批日志文件中输出日志(仅针对单台服务器上启动多个Worker的情况),则会导致日志输出混乱。此时好的做法是为每一个Worker建立起独立的日志文件,例如可以使用Worker名称或进程号来命名等。命名时可以使用统一的命名前缀或后缀,这样也方便与后期的日志采集工作。
3.6 任务重试任务重试有两种定义方式:
  1. 手动定义
    需要自行在Task的逻辑中使用try except来抓取错误,并调用Task的retry()方法来进行任务重试。同时,可以为task设置一个最大重试次数max_retries; 并且还可通过default_retry_delay来定义重试的时间间隔(延迟时间的单位是秒)。
    注意: retry()方法返回的是一个错误,因为需要向上层抛出(raise)出去。
  2. 自动重试
    使用autoretry_for参数即可,表示当出现任何错误的时候都将重试。还可以通过Task.retry_backoff来设置延迟重试,同时还可以设置延迟因子Task.retry_jitter来控制延迟的间隔。Task.max_retries用来设置重试的最大次数。
    例如:
    @app.task(autoretry_for=(Exception,)) def task(): pass

3.7 实时获取Task状态
  • 对于获取Task状态,这里提供一个思路供大家参考:
3.8 相关配置在编写完相应的Task后,还需要配置如下一些配置才可以进行使用:
  1. broker
    要指定代理者(任务队列);用来承载需要处理的任务。
  2. backend (仅Worker使用)
    Worker执行完相应Task后做产生的result将会保存在backend中。以便与后续的分析和任务状态确认。
    例如:
    # Result Backend Config BASE_DIR = Path(__file__).resolve().parent.parent result_backend = .join([file://, os.path.join(BASE_DIR, cdm_celery/result)])

  3. 任务注册
    在调用Task的时候需要使用app对象的task属性来调用相应的任务,而task属性中就是使用任务注册表中来查找哪些任务可以被调用。Worker接收到执行任务信息后,也先会使用app对象来查询要执行的task对应的源代码是哪个,查询的依据就是任务注册表。
    因此Task在使用前需要进行任务注册,即自定义Task的信息都需要填写在待注册列表中,以便Celery在启动的时候将这些Task都注册到任务注册表中。
    例如:
    imports = ( cdm_celery.task.route53.tasks, cdm_celery.task.email.tasks, cdm_celery.task.dnspod.tasks, cdm_celery.task.qywx.tasks )

3.9 任务调用链一般,我们都会将所需的业务逻辑或计算任务封装到Task中。当一个任务过于繁杂时,可能需要将其拆分为多个子任务。往往,这些子任务还存在一定的执行顺序。面对这个需求时,可以通过Task调用Task的方式,来将这些子任务串连起来,达到顺序执行的效果。这里笔者提供一个曾经实现过的思路,供大家学习和思考。
通过结合上述的任务调用链和反射概念,可以实现动态配置任务调用链的功能。即在无需更改原代码的情况下可以快速在链中增加,修改或删除任务。
3.9.1 反射
通过反射操作,可以实现动态加载Task的功能,这为后续维护任务增加了很大的便利性。其实现的思路如下:
  1. 首先,在一些全局的配置中建立反射任务队列
    alert_actions = [ cdm_celery.task.email.tasks.send_email, cdm_celery.task.qywx.tasks.qywx_alert ]

  2. 其次,利用Python的相关模块实现反射逻辑
    from importlib import import_module

def modules_extract(module_names=None):
if not isinstance(module_names, Iterable):
raise ValueError(" 需要提供包含了模块全路径名称字符串的可迭代对象" )
for m in module_names: obj = CloudDomainServiceConsumer.module_extract(m) yield obj

### 3.9.2 Task调用Task 在相关的逻辑中加载反射任务队列和调用反射逻辑。同时,`通过使用Celery Task signature的概念来调用相关Task`即可。 ```python def alert(self, data=https://www.songbingjia.com/android/None): for alert_task in CloudDomainServiceConsumer.modules_extract(alert_actions): alert_task.signature( (), data, exchange=app.conf.task_queues[Alert][exchange], routing_key=app.conf.task_queues[Alert][routing_key] ).apply_async()

4. 配置对于Celery的使用配置,笔者在这里想提醒大家注意一点: 4.0版本以后启用了新的小写的配置项名称。
配置文件中不能同时出现老版本的CELERY开头大写和新的小写配置项名,建议统一使用新的小写配置项名
5. 小彩蛋这里之所以说是彩蛋,是因为这个章节所介绍的不是Celery的相关知识点。只是笔者在日常工作和学习中偶然碰到的与Celery相关的一个知识点,觉得很有意思,想给大家分享一下而已。
我们经常会在一些业务或异步任务中添加重试机制的相关逻辑,业界使用最多的重试机制实现方式为指数退避算法。这里先对指数退避算法做一个简单的介绍:
随着重传次数的增加,延迟的程度也会指数增长。说的通俗点,每次重试的时间间隔都是上一次的两倍。
注意: 每次延长的是时间区间的最大值,并且每次的真正需要等待的时间是随机从时间区间中取出的。
恰巧,在Celery中有指数退避算法的相关实现(celery/utils/time.py中):
def get_exponential_backoff_interval( factor, retries, maximum, full_jitter=False ): """Calculate the exponential backoff wait time.""" # Will be zero if factor equals 0 countdown = factor * (2 ** retries) # Full jitter according to # https://www.awsarchitectureblog.com/2015/03/backoff.html if full_jitter: countdown = random.randrange(countdown + 1) # Adjust according to maximum wait time and account for negative values. return max(0, min(maximum, countdown))

【Celery知识点总结】大家可以模仿上述的代码来实现自需的指数退避算法。

    推荐阅读