Open-falcon transfer源码解读

transfer可以理解为中转模块,它接收agent上报的指标,然后转发给后端的graph和judge实例。
transfer接收到agent上报的指标后,先存储到内存queue,然后再由goroutine默默的将queue的数据Pop出来,转发给graph和judge。
transfer后端接多个graph和judge实例,如何保证某一个指标稳定的转发到某个实例,同时还能保证多个graph间保持均衡,不会出现某个graph承担过多的指标而产生数据倾斜?transfer使用了一致性hash算法来做到这一点。
整体架构:
Open-falcon transfer源码解读
文章图片

1. transfer接收agent上报的指标数据 transfer通过TCP RPC接收agent的数据:

// modules/transfer/receiver/rpc/rpc.go func StartRpc() { listener, err := net.ListenTCP("tcp", tcpAddr) server := rpc.NewServer() server.Register(new(Transfer)) for { conn, err := listener.AcceptTCP() go server.ServeCodec(jsonrpc.NewServerCodec(conn)) } }

transfer的RPC方法:Transfer.Update,负责接收数据
//modules/transfer/receiver/rpc/rpc_transfer.go type Transfer int func (t *Transfer) Update(args []*cmodel.MetricValue, reply *cmodel.TransferResponse) error { return RecvMetricValues(args, reply, "rpc") } func RecvMetricValues(args []*cmodel.MetricValue, reply *cmodel.TransferResponse, from string) error { items := []*cmodel.MetaData{} for _, v := range args { fv := &cmodel.MetaData{ Metric:v.Metric, Endpoint:v.Endpoint, Timestamp:v.Timestamp, Step:v.Step, CounterType: v.Type, Tags:cutils.DictedTagstring(v.Tags), } ....... items = append(items, fv) } if cfg.Graph.Enabled { sender.Push2GraphSendQueue(items) } if cfg.Judge.Enabled { sender.Push2JudgeSendQueue(items) } }

可以看到,transfer直接将items放入graph/judge中的Queue就返回了,并不会直接发送;这样做有以下好处:
  • 更快的响应agent;
  • 把零散的数据做成恒定大小的批次,再发送给后端,减轻对后端实例的冲击;
  • 将数据缓存以后,可以从容的处理发送超时等异常情况;
以graph为例,分析如何确定某个item放入那个graph Queue:
// modules/transfer/sender/sender.go func Push2GraphSendQueue(items []*cmodel.MetaData) { for _, item := range items { pk := item.PK() //根据item的key确定放入那个graph Queue node, err := GraphNodeRing.GetNode(pk)//将数据Push进queue for _, addr := range cnode.Addrs { Q := GraphQueues[node+addr] if !Q.PushFront(graphItem) { errCnt += 1 } } } }

可以看出,根据item的key确定graphQueue,而key是将endpoint/metric/tags信息拼成了一个字符串:
func (t *MetaData) PK() string { return MUtils.PK(t.Endpoint, t.Metric, t.Tags) } func PK(endpoint, metric string, tags map[string]string) string { ret := bufferPool.Get().(*bytes.Buffer) ret.Reset() defer bufferPool.Put(ret)if tags == nil || len(tags) == 0 { ret.WriteString(endpoint) ret.WriteString("/") ret.WriteString(metric)return ret.String() } ret.WriteString(endpoint) ret.WriteString("/") ret.WriteString(metric) ret.WriteString("/") ret.WriteString(SortedTags(tags)) return ret.String() }

2. 一致性hash保证graph/judge间的数据均衡 itemKey是个string,如何确定将该item放入哪个graphQueue呢?
答案是一致性hash算法,根据itemKey通过一致性hash确定一个node,然后每个node对应1个graphQueue。
这里重点关注一致性hash如何使用:
//根据pk选择node node, err := GraphNodeRing.GetNode(pk)`//创建Graph节点的hash环 GraphNodeRing = rings.NewConsistentHashNodesRing(int32(cfg.Graph.Replicas), cutils.KeysOfMap(cfg.Graph.Cluster))

【Open-falcon transfer源码解读】使用graph节点创建graph节点的hash环,每个节点有replica个虚拟节点,以保证数据均衡;
这里的一致性hash使用了github.com/toolkits/consistent/rings的开源实现:
//创建hash环 func NewConsistentHashNodesRing(numberOfReplicas int32, nodes []string) *ConsistentHashNodeRing { ret := &ConsistentHashNodeRing{ring: consistent.New()} ret.SetNumberOfReplicas(numberOfReplicas) ret.SetNodes(nodes) return ret }

// 根据pk,获取node节点. chash(pk) -> node func (this *ConsistentHashNodeRing) GetNode(pk string) (string, error) { return this.ring.Get(pk) }

item选到node以后,就被push到该node对应的graphQueue,最后graphQueue的数据被RPC发送给graph节点:
GraphQueues= make(map[string]*nlist.SafeListLimited)Q := GraphQueues[node+addr] if !Q.PushFront(graphItem) { errCnt += 1 }

其中nlist.SafeListLimited是用list封装的一个queue结构:
// SafeList with Limited Size type SafeListLimited struct { maxSize int SL*SafeList }type SafeList struct { sync.RWMutex L *list.List }

3. queue中的数据转发给graph/judge 每个graph节点对应一个graphQueue,需要被TCP RPC发送给graph节点,这由后台的goroutine来执行的:
// modules/transfer/sender/send_tasks.go func startSendTasks() { ...... for node, nitem := range cfg.Graph.ClusterList { for _, addr := range nitem.Addrs { queue := GraphQueues[node+addr] go forward2GraphTask(queue, node, addr, graphConcurrent) } } ...... }func forward2GraphTask(Q *list.SafeListLimited, node string, addr string, concurrent int) { batch := g.Config().Graph.Batch sema := nsema.NewSemaphore(concurrent) for { items := Q.PopBackBy(batch) count := len(items) if count == 0 { time.Sleep(DefaultSendTaskSleepInterval) continue } sema.Acquire() go func(addr string, graphItems []*cmodel.GraphItem, count int) { defer sema.Release() err = GraphConnPools.Call(addr, "Graph.Send", graphItems, resp) }(addr, graphItems, count) } }

每个graph节点启动1个goroutine进行发送;发送过程中,每来一个batch就启动1个goroutine进行发送,为控制发送的goroutine数量,使用semaphore(channel实现)控制并发。
具体的发送过程:为每个graph创建了1个rpc连接池,发送时先从池中Fetch一个连接,然后使用这个conn调用rpcClient.Call()完成发送:
// common/backend_pool/rpc_backends.go func (this *SafeRpcConnPools) Call(addr, method string, args interface{}, resp interface{}) error { connPool, exists := this.Get(addr) //先获取一个连接 conn, err := connPool.Fetch() rpcClient := conn.(*rpcpool.RpcClient) done := make(chan error, 1) go func() { //具体的rpc发送 done <- rpcClient.Call(method, args, resp) }() // select timeout进行超时控制 select { case <-time.After(callTimeout): connPool.ForceClose(conn) return fmt.Errorf("%s, call timeout", addr) case err = <-done: connPool.Release(conn) return err } }

connPool的实现也很简单,内部维护了一个maxConns和maxIdle个数;每次fetch的时候,判断是否有空闲连接,若有空闲则直接返回,否则new一个新的:
// common/backend_pool/rpc_backends.go func createOneRpcPool(name string, address string, connTimeout time.Duration, maxConns int, maxIdle int) *connp.ConnPool { p := connp.NewConnPool(name, address, int32(maxConns), int32(maxIdle)) p.New = func(connName string) (connp.NConn, error) { _, err := net.ResolveTCPAddr("tcp", p.Address) if err != nil { return nil, err } conn, err := net.DialTimeout("tcp", p.Address, connTimeout) if err != nil { return nil, err } return rpcpool.NewRpcClient(rpc.NewClient(conn), connName), nil } return p }

ConnPool实现在github.com/toolkits/conn_pool中:
//github.com/toolkits/conn_pool/conn_pool.go func (this *ConnPool) Fetch() (NConn, error) { this.Lock() defer this.Unlock()// get from free conn := this.fetchFree() if conn != nil { return conn, nil }if this.overMax() { return nil, ErrMaxConn }// create new conn conn, err := this.newConn() if err != nil { return nil, err }this.increActive() return conn, nil }

    推荐阅读