Open-falcon hbs源码解读
hbs负责周期性的读取db中内容,缓存到本地cache,然后提供RPC接口以供agent和judge两个组件查询调用。
// modules/hbs/rpc/rpc.go
func Start() {
server := rpc.NewServer()
server.Register(new(Agent))
server.Register(new(Hbs))
l, e := net.Listen("tcp", addr)
for {
conn, err := l.Accept()
......
go server.ServeCodec(jsonrpc.NewServerCodec(conn))
}
}
hbs对接agent:
- 处理agent heartbeat请求;
- 处理agent plugin的查询请求;
- 处理agent 监控哪些进程、哪些端口的查询请求;
- 查询当前配置的所有告警策略;
- 查询当前配置的所有告警表达式;
文章图片
1. hbs对接agent 【Open-falcon hbs源码解读】agent查询执行哪些plugin:
// modules/hbs/rpc/agent.go
func (t *Agent) MinePlugins(args model.AgentHeartbeatRequest, reply *model.AgentPluginsResponse) error {
if args.Hostname == "" {
return nil
}reply.Plugins = cache.GetPlugins(args.Hostname)
reply.Timestamp = time.Now().Unix()return nil
}
agent查询监控哪些进程、哪些端口:
// modules/hbs/rpc/agent.go
func (t *Agent) BuiltinMetrics(args *model.AgentHeartbeatRequest, reply *model.BuiltinMetricResponse) error {
if args.Hostname == "" {
return nil
}
metrics, err := cache.GetBuiltinMetrics(args.Hostname)
if err != nil {
return nil
}
checksum := ""
if len(metrics) > 0 {
checksum = DigestBuiltinMetrics(metrics)
}
if args.Checksum == checksum {
reply.Metrics = []*model.BuiltinMetric{}
} else {
reply.Metrics = metrics
}
reply.Checksum = checksum
reply.Timestamp = time.Now().Unix()
return nil
}
可以看到,上面的rpc接口操作的都是cache,hbs会定期的从db中查询数据,然后缓存在本地cache,以供agent查询:
// modules/hbs/cache/cache.go
func LoopInit() {
for {
//1min周期
time.Sleep(time.Minute)
GroupPlugins.Init()
....
}
}
从db读取数据,然后保存在本地:为了防止并发写data,这里加了锁
// moduels/hbs/cache/plugins.go
var GroupPlugins = &SafeGroupPlugins{M: make(map[int][]string)}func (this *SafeGroupPlugins) Init() {
m, err := db.QueryPlugins()
if err != nil {
return
}this.Lock()
defer this.Unlock()
this.M = m
}
2. hbs对接judge judge查询告警表达式:
// modules/hbs/rpc/hbs.go
func (t *Hbs) GetExpressions(req model.NullRpcRequest, reply *model.ExpressionResponse) error {
reply.Expressions = cache.ExpressionCache.Get()
return nil
}
judge查询告警策略:由于关联多个db table,这里进行了比较复杂的拼装
// modules/hbs/rpc/hbs.go
func (t *Hbs) GetStrategies(req model.NullRpcRequest, reply *model.StrategiesResponse) error {
reply.HostStrategies = []*model.HostStrategy{}
hidTids := cache.HostTemplateIds.GetMap()
hosts := cache.MonitoredHosts.Get()
tpls := cache.TemplateCache.GetMap()
strategies := cache.Strategies.GetMap()
tpl2Strategies := Tpl2Strategies(strategies)
hostStrategies := make([]*model.HostStrategy, 0, sz)
for hostId, tplIds := range hidTids {
ss := CalcInheritStrategies(tpls, tplIds, tpl2Strategies)
hs := model.HostStrategy{
Hostname:h.Name,
Strategies: ss,
}
hostStrategies = append(hostStrategies, &hs)
}
reply.HostStrategies = hostStrategies
return nil
}
同样的,上面的rpc接口操作的都是cache,hbs会定期的从db中查询数据,然后缓存在本地cache,以供judge查询:
// modules/hbs/cache/cache.go
func LoopInit() {
for {
time.Sleep(time.Minute)
......
GroupTemplates.Init()
HostGroupsMap.Init()
HostMap.Init()
TemplateCache.Init()
Strategies.Init(TemplateCache.GetMap())
HostTemplateIds.Init()
ExpressionCache.Init()
}
}
cache的数据保存在map中,然后读取db中的数据(定期),覆盖掉map
// modules/hbs/cache/templates.go
type SafeTemplateCache struct {
sync.RWMutex
M map[int]*model.Template
}var TemplateCache = &SafeTemplateCache{M: make(map[int]*model.Template)}func (this *SafeTemplateCache) Init() {
ts, err := db.QueryTemplates()
if err != nil {
return
}
this.Lock()
defer this.Unlock()
this.M = ts
}
推荐阅读
- Android事件传递源码分析
- Quartz|Quartz 源码解析(四) —— QuartzScheduler和Listener事件监听
- [源码解析]|[源码解析] NVIDIA HugeCTR,GPU版本参数服务器---(3)
- ffmpeg源码分析01(结构体)
- Java程序员阅读源码的小技巧,原来大牛都是这样读的,赶紧看看!
- Vue源码分析—响应式原理(二)
- SwiftUI|SwiftUI iOS 瀑布流组件之仿CollectionView不规则图文混合(教程含源码)
- java|java b2b2c shop 多用户商城系统源码- config 修改配置
- Spring源码解析_属性赋值
- Android下的IO库-Okio源码解析(一)|Android下的IO库-Okio源码解析(一) 入门