Open-falcon hbs源码解读

hbs负责周期性的读取db中内容,缓存到本地cache,然后提供RPC接口以供agent和judge两个组件查询调用。

// modules/hbs/rpc/rpc.go func Start() { server := rpc.NewServer() server.Register(new(Agent)) server.Register(new(Hbs)) l, e := net.Listen("tcp", addr) for { conn, err := l.Accept() ...... go server.ServeCodec(jsonrpc.NewServerCodec(conn)) } }

hbs对接agent:
  • 处理agent heartbeat请求;
  • 处理agent plugin的查询请求;
  • 处理agent 监控哪些进程、哪些端口的查询请求;
hbs对接judge:
  • 查询当前配置的所有告警策略;
  • 查询当前配置的所有告警表达式;
整体流程图:
Open-falcon hbs源码解读
文章图片

1. hbs对接agent 【Open-falcon hbs源码解读】agent查询执行哪些plugin:
// modules/hbs/rpc/agent.go func (t *Agent) MinePlugins(args model.AgentHeartbeatRequest, reply *model.AgentPluginsResponse) error { if args.Hostname == "" { return nil }reply.Plugins = cache.GetPlugins(args.Hostname) reply.Timestamp = time.Now().Unix()return nil }

agent查询监控哪些进程、哪些端口:
// modules/hbs/rpc/agent.go func (t *Agent) BuiltinMetrics(args *model.AgentHeartbeatRequest, reply *model.BuiltinMetricResponse) error { if args.Hostname == "" { return nil } metrics, err := cache.GetBuiltinMetrics(args.Hostname) if err != nil { return nil } checksum := "" if len(metrics) > 0 { checksum = DigestBuiltinMetrics(metrics) } if args.Checksum == checksum { reply.Metrics = []*model.BuiltinMetric{} } else { reply.Metrics = metrics } reply.Checksum = checksum reply.Timestamp = time.Now().Unix() return nil }

可以看到,上面的rpc接口操作的都是cache,hbs会定期的从db中查询数据,然后缓存在本地cache,以供agent查询:
// modules/hbs/cache/cache.go func LoopInit() { for { //1min周期 time.Sleep(time.Minute) GroupPlugins.Init() .... } }

从db读取数据,然后保存在本地:为了防止并发写data,这里加了锁
// moduels/hbs/cache/plugins.go var GroupPlugins = &SafeGroupPlugins{M: make(map[int][]string)}func (this *SafeGroupPlugins) Init() { m, err := db.QueryPlugins() if err != nil { return }this.Lock() defer this.Unlock() this.M = m }

2. hbs对接judge judge查询告警表达式:
// modules/hbs/rpc/hbs.go func (t *Hbs) GetExpressions(req model.NullRpcRequest, reply *model.ExpressionResponse) error { reply.Expressions = cache.ExpressionCache.Get() return nil }

judge查询告警策略:由于关联多个db table,这里进行了比较复杂的拼装
// modules/hbs/rpc/hbs.go func (t *Hbs) GetStrategies(req model.NullRpcRequest, reply *model.StrategiesResponse) error { reply.HostStrategies = []*model.HostStrategy{} hidTids := cache.HostTemplateIds.GetMap() hosts := cache.MonitoredHosts.Get() tpls := cache.TemplateCache.GetMap() strategies := cache.Strategies.GetMap() tpl2Strategies := Tpl2Strategies(strategies) hostStrategies := make([]*model.HostStrategy, 0, sz) for hostId, tplIds := range hidTids { ss := CalcInheritStrategies(tpls, tplIds, tpl2Strategies) hs := model.HostStrategy{ Hostname:h.Name, Strategies: ss, } hostStrategies = append(hostStrategies, &hs) } reply.HostStrategies = hostStrategies return nil }

同样的,上面的rpc接口操作的都是cache,hbs会定期的从db中查询数据,然后缓存在本地cache,以供judge查询:
// modules/hbs/cache/cache.go func LoopInit() { for { time.Sleep(time.Minute) ...... GroupTemplates.Init() HostGroupsMap.Init() HostMap.Init() TemplateCache.Init() Strategies.Init(TemplateCache.GetMap()) HostTemplateIds.Init() ExpressionCache.Init() } }

cache的数据保存在map中,然后读取db中的数据(定期),覆盖掉map
// modules/hbs/cache/templates.go type SafeTemplateCache struct { sync.RWMutex M map[int]*model.Template }var TemplateCache = &SafeTemplateCache{M: make(map[int]*model.Template)}func (this *SafeTemplateCache) Init() { ts, err := db.QueryTemplates() if err != nil { return } this.Lock() defer this.Unlock() this.M = ts }

    推荐阅读