基于Python的信号库|基于Python的信号库 Blinker

环境

【基于Python的信号库|基于Python的信号库 Blinker】Python 3.6.4
简介
Blinker是一个基于Python的强大的信号库,支持一对一、一对多的订阅发布模式,支持发送任意大小的数据等等,且线程安全。
安装
pip install blinker

使用
signal为单例模式 signal 使用了单例模式,允许代码的不同模块得到相同的signal,而不用互相传参。
In [1]: from blinker import signalIn [2]: a = signal('signal_test')In [3]: b = signal('signal_test')In [4]: a is b Out[4]: True

订阅信号 使用.connect(func)方法来订阅一个信号,当信号发布时,该信号的订阅者会执行func
In [5]: def subscriber(sender): ...:print('Got a signal sent by {}'.format(sender)) ...:In [6]: ready = signal('ready')In [7]: ready.connect(subscriber) Out[7]:

发布信号 使用.send()方法来发布信号,会通知所有订阅者,如果没有订阅者则什么都不会发生。
In [12]: class Processor(object): ...: ...:def __init__(self, name): ...:self.name = name ...: ...:def go(self): ...:ready = signal('ready') ...:ready.send(self) ...:print('Processing...') ...:complete = signal('complete') ...:complete.send(self) ...: ...:def __repr__(self): ...:return ''.format(self.name) ...:In [13]: processor_a = Processor('a')In [14]: processor_a.go() Got a signal sent by Processing...

订阅指定的发布者
.connect()方法接收一个可选参数sender,可用于接收指定发布者的信号。
In [18]: def b_subscriber(): ...:print('Caught signal from peocessor_b') ...:In [19]: ready.connect(b_subscriber, sender=processor_b) Out[19]: In [20]: processor_a.go() Got a signal sent by Processing...In [21]: processor_b.go() Got a signal sent by Caught signal from peocessor_b Processing...

订阅者接收发布者传递的数据
除了之前的通过.connect方法来订阅外,还可以通过装饰器的方法来订阅。
订阅的方法可以接收发布者传递的数据。
In [22]: send_data = https://www.it610.com/article/signal('send-data')In [23]: @send_data.connect ...: def receive_data(sender, **kw): ...:print('Caught signal from {}, data: {}'.format(sender, kw)) ...:return 'received!' ...: ...: In [24]: result = send_data.send('anonymous', abc=123) Caught signal from anonymous, data: {'abc': 123}

.send方法的返回值是一个由元组组成的列表,每个元组的第一个值为订阅者的方法,第二个值为订阅者的返回值
In [25]: result Out[25]: [(, 'received!')]

匿名信号
信号可以是匿名的,可以使用Signal类来创建唯一的信号(S大写,这个类不像之前的signal,为非单例模式)。
下面的on_readyon_complete为两个不同的信号
In [28]: from blinker import SignalIn [29]: class AltProcessor(object): ...:on_ready = Signal() ...:on_complete = Signal() ...: ...:def __init__(self, name): ...:self.name = name ...: ...:def go(self): ...:self.on_ready.send(self) ...:print('Altername processing') ...:self.on_complete.send(self) ...: ...:def __repr__(self): ...:return ''.format(self.name)

通过装饰器来订阅
在订阅者接收发布者传递的数据中简单地演示了使用装饰器来订阅,但是那种订阅方式不支持订阅指定的发布者,这时候我们可以用.connect_via(sender)
In [31]: @dice_roll.connect_via(1) ...: @dice_roll.connect_via(3) ...: @dice_roll.connect_via(5) ...: def odd_subscriver(sender): ...:print('Observed dice roll {}'.format(sender)) ...:In [32]: result = dice_roll.send(3) Observed dice roll 3In [33]: result = dice_roll.send(1) Observed dice roll 1In [34]: result = dice_roll.send(5) Observed dice roll 5In [35]: result = dice_roll.send(2)

检查信号是否有订阅者
In [37]: bool(signal('ready').receivers) Out[37]: TrueIn [38]: bool(signal('complete').receivers) Out[38]: FalseIn [39]: bool(AltProcessor.on_complete.receivers) Out[39]: FalseIn [40]: signal('ready').has_receivers_for(processor_a) Out[40]: True

参考
Blinker 官方文档
博客更新地址
  • 宋明耀的博客 [ 第一时间更新 ]
  • 知乎专栏 Python Cookbook
  • 知乎专栏 DevOps - BetterWorld
  • 流月0的文章

    推荐阅读