Open-falcon semaphore的源码实现
semaphore即信号量,Open-falcon在将queue中的数据发送给graph时,用semaphore控制并发的goroutine数量,避免产生大量的发送goroutine。
func forward2GraphTask(Q *list.SafeListLimited, node string, addr string, concurrent int) {
sema := nsema.NewSemaphore(concurrent)
for {
items := Q.PopBackBy(batch)
sema.Acquire()
go func(addr string, graphItems []*cmodel.GraphItem, count int) {
defer sema.Release()
err = GraphConnPools.Call(addr, "Graph.Send", graphItems, resp)
....
}
....
}
}
semaphore使用channel实现:
- 定义一个len的channel;
- 当申请semaphore时,入队channel一个元素,当超过len时,阻塞住,也就是申请失败;
- 当释放semaphore是,出队channel一个元素;
type Semaphore struct {
bufSize int
channel chan int8
}func NewSemaphore(concurrencyNum int) *Semaphore {
return &Semaphore{channel: make(chan int8, concurrencyNum), bufSize: concurrencyNum}
}
申请semaphore:
//channel入队成功,返回true;否则返回false;
func (this *Semaphore) TryAcquire() bool {
select {
case this.channel <- int8(0):
return true
default:
return false
}
}//若channel未满,入队返回;否则,阻塞
func (this *Semaphore) Acquire() {
this.channel <- int8(0)
}
释放semaphore: channel出队一个元素
func (this *Semaphore) Release() {
<-this.channel
}
【Open-falcon semaphore的源码实现】查询可用的semaphore:
func (this *Semaphore) AvailablePermits() int {
return this.bufSize - len(this.channel)
}
推荐阅读
- 热闹中的孤独
- JAVA(抽象类与接口的区别&重载与重写&内存泄漏)
- 放屁有这三个特征的,请注意啦!这说明你的身体毒素太多
- 一个人的旅行,三亚
- 布丽吉特,人生绝对的赢家
- 慢慢的美丽
- 尽力
- 一个小故事,我的思考。
- 家乡的那条小河
- 《真与假的困惑》???|《真与假的困惑》??? ——致良知是一种伟大的力量