基于Python的信号库|基于Python的信号库 Blinker
环境
【基于Python的信号库|基于Python的信号库 Blinker】Python 3.6.4简介
Blinker是一个基于Python的强大的信号库,支持一对一、一对多的订阅发布模式,支持发送任意大小的数据等等,且线程安全。
安装
pip install blinker
使用
signal
为单例模式
signal 使用了单例模式,允许代码的不同模块得到相同的signal,而不用互相传参。In [1]: from blinker import signalIn [2]: a = signal('signal_test')In [3]: b = signal('signal_test')In [4]: a is b
Out[4]: True
订阅信号 使用
.connect(func)
方法来订阅一个信号,当信号发布时,该信号的订阅者会执行func
。In [5]: def subscriber(sender):
...:print('Got a signal sent by {}'.format(sender))
...:In [6]: ready = signal('ready')In [7]: ready.connect(subscriber)
Out[7]:
发布信号 使用
.send()
方法来发布信号,会通知所有订阅者,如果没有订阅者则什么都不会发生。In [12]: class Processor(object):
...:
...:def __init__(self, name):
...:self.name = name
...:
...:def go(self):
...:ready = signal('ready')
...:ready.send(self)
...:print('Processing...')
...:complete = signal('complete')
...:complete.send(self)
...:
...:def __repr__(self):
...:return ''.format(self.name)
...:In [13]: processor_a = Processor('a')In [14]: processor_a.go()
Got a signal sent by Processing...
订阅指定的发布者
.connect()
方法接收一个可选参数sender
,可用于接收指定发布者的信号。In [18]: def b_subscriber():
...:print('Caught signal from peocessor_b')
...:In [19]: ready.connect(b_subscriber, sender=processor_b)
Out[19]: In [20]: processor_a.go()
Got a signal sent by Processing...In [21]: processor_b.go()
Got a signal sent by Caught signal from peocessor_b
Processing...
订阅者接收发布者传递的数据
除了之前的通过
.connect
方法来订阅外,还可以通过装饰器的方法来订阅。订阅的方法可以接收发布者传递的数据。
In [22]: send_data = https://www.it610.com/article/signal('send-data')In [23]: @send_data.connect
...: def receive_data(sender, **kw):
...:print('Caught signal from {}, data: {}'.format(sender, kw))
...:return 'received!'
...:
...: In [24]: result = send_data.send('anonymous', abc=123)
Caught signal from anonymous, data: {'abc': 123}
.send
方法的返回值是一个由元组组成的列表,每个元组的第一个值为订阅者的方法,第二个值为订阅者的返回值In [25]: result
Out[25]: [(, 'received!')]
匿名信号
信号可以是匿名的,可以使用
Signal
类来创建唯一的信号(S
大写,这个类不像之前的signal
,为非单例模式)。下面的
on_ready
和on_complete
为两个不同的信号In [28]: from blinker import SignalIn [29]: class AltProcessor(object):
...:on_ready = Signal()
...:on_complete = Signal()
...:
...:def __init__(self, name):
...:self.name = name
...:
...:def go(self):
...:self.on_ready.send(self)
...:print('Altername processing')
...:self.on_complete.send(self)
...:
...:def __repr__(self):
...:return ''.format(self.name)
通过装饰器来订阅
在订阅者接收发布者传递的数据中简单地演示了使用装饰器来订阅,但是那种订阅方式不支持订阅指定的发布者,这时候我们可以用
.connect_via(sender)
In [31]: @dice_roll.connect_via(1)
...: @dice_roll.connect_via(3)
...: @dice_roll.connect_via(5)
...: def odd_subscriver(sender):
...:print('Observed dice roll {}'.format(sender))
...:In [32]: result = dice_roll.send(3)
Observed dice roll 3In [33]: result = dice_roll.send(1)
Observed dice roll 1In [34]: result = dice_roll.send(5)
Observed dice roll 5In [35]: result = dice_roll.send(2)
检查信号是否有订阅者
In [37]: bool(signal('ready').receivers)
Out[37]: TrueIn [38]: bool(signal('complete').receivers)
Out[38]: FalseIn [39]: bool(AltProcessor.on_complete.receivers)
Out[39]: FalseIn [40]: signal('ready').has_receivers_for(processor_a)
Out[40]: True
参考
Blinker 官方文档
博客更新地址
- 宋明耀的博客 [ 第一时间更新 ]
- 知乎专栏 Python Cookbook
- 知乎专栏 DevOps - BetterWorld
- 流月0的文章
推荐阅读
- 热闹中的孤独
- JAVA(抽象类与接口的区别&重载与重写&内存泄漏)
- 放屁有这三个特征的,请注意啦!这说明你的身体毒素太多
- 一个人的旅行,三亚
- 布丽吉特,人生绝对的赢家
- 慢慢的美丽
- 尽力
- 一个小故事,我的思考。
- 家乡的那条小河
- 《真与假的困惑》???|《真与假的困惑》??? ——致良知是一种伟大的力量