Open-falcon semaphore的源码实现

semaphore即信号量,Open-falcon在将queue中的数据发送给graph时,用semaphore控制并发的goroutine数量,避免产生大量的发送goroutine。

func forward2GraphTask(Q *list.SafeListLimited, node string, addr string, concurrent int) { sema := nsema.NewSemaphore(concurrent) for { items := Q.PopBackBy(batch) sema.Acquire() go func(addr string, graphItems []*cmodel.GraphItem, count int) { defer sema.Release() err = GraphConnPools.Call(addr, "Graph.Send", graphItems, resp) .... } .... } }

semaphore使用channel实现:
  • 定义一个len的channel;
  • 当申请semaphore时,入队channel一个元素,当超过len时,阻塞住,也就是申请失败;
  • 当释放semaphore是,出队channel一个元素;
type Semaphore struct { bufSize int channel chan int8 }func NewSemaphore(concurrencyNum int) *Semaphore { return &Semaphore{channel: make(chan int8, concurrencyNum), bufSize: concurrencyNum} }

申请semaphore:
//channel入队成功,返回true;否则返回false; func (this *Semaphore) TryAcquire() bool { select { case this.channel <- int8(0): return true default: return false } }//若channel未满,入队返回;否则,阻塞 func (this *Semaphore) Acquire() { this.channel <- int8(0) }

释放semaphore: channel出队一个元素
func (this *Semaphore) Release() { <-this.channel }

【Open-falcon semaphore的源码实现】查询可用的semaphore:
func (this *Semaphore) AvailablePermits() int { return this.bufSize - len(this.channel) }

    推荐阅读