Open-falcon judge源码分析
judge模块使用历史数据进行告警判定,其历史数据保存在内存中(仅保存最近的数据),若触发告警,则发送event至redis,由alarm模块消费处理redis的事件。
由于judge模块将最近的历史数据保存在内存,因此它是有状态的;当节点宕机时,内存中的历史数据将丢失。然而,由于监控数据会源源不断的上报,它会上报到其它judge节点,在其它节点重新进行告警判定。
judge模块的职责:
- 接收transfer转发的数据,并仅存储最近的几个点,以进行告警判定;配置remain=11,也就是根据最新的10个点数据,判定是否触发了告警;
- 从hbs同步告警策略,hbs缓存了用户配置的策略列表,judge使用1个rpc长连接查询策略列表;
- 判断数据是否达到阈值产生告警事件:根据历史数据和策略表达式,判断是否达到阈值;
- 判断告警事件是否应该写入redis:
- 告警事件不一定写入redis,需要根据配置的最大报警次数来确定是否写入redis;
- 比如配置最大报警次数=3,当第4次产生报警事件的时候,就不会写入redis;
文章图片
1. 接收数据:大Map judge接收transfer转发的数据,将数据存入本地内存:
// modules/judge/rpc/receiver.go
func (this *Judge) Send(items []*model.JudgeItem, resp *model.SimpleRpcResponse) error {
remain := g.Config().Remain
now := time.Now().Unix()
for _, item := range items {
.......
pk := item.PrimaryKey()
store.HistoryBigMap[pk[0:2]].PushFrontAndMaintain(pk, item, remain, now)
}
return nil
}
本地的store.HistoryBigMap是个大Map: 为了减轻大Map的并发读写压力,对itemKey的md5的前2个字符进行了拆分,分成了16*16=256个小Map,每个小Map内并发读写加锁,降低了锁粒度:
// modules/judge/store/history.go
var HistoryBigMap = make(map[string]*JudgeItemMap)func InitHistoryBigMap() {
arr := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"}
for i := 0;
i < 16;
i++ {
for j := 0;
j < 16;
j++ {
HistoryBigMap[arr[i]+arr[j]] = NewJudgeItemMap()
}
}
}type JudgeItemMap struct {
sync.RWMutex
M map[string]*SafeLinkedList
}
小map中:map[string]*SafeLinkedList,key=itemKey,value=https://www.it610.com/article/最近的11个点,当有新数据时,会把老的数据从list中删掉;
magicNumber=11(最近的11个点)在配置文件中指定,是一个经验数据,一般来讲根据最近的11个点来判定告警触发已足够;
小map读写时使用Lock进行并发控制;
// modules/judge/store/linkedlist.go
func (this *SafeLinkedList) PushFrontAndMaintain(v *model.JudgeItem, maxCount int) bool {
this.Lock()
defer this.Unlock()sz := this.L.Len()
// 新数据push进list
this.L.PushFront(v)sz++
if sz <= maxCount {
return true
}
// 超过了11个点,删掉老数据
del := sz - maxCount
for i := 0;
i < del;
i++ {
this.L.Remove(this.L.Back())
}return true
}
2. 告警判定 告警判定时,会解析如下的逻辑表达式:
- all(#3) > 80、max(#3) > 80、min(#3) > 80、sum(#3) > 80、avg(#3) > 80;
- 针对最新的几个点:
- all(#3):表示最近的3个点都满足阈值;
- max(#3): 标识最近3个点的最大值满足阈值;
- min(#3): 标识最近3个点的最小值满足阈值;
- sum(#3): 标识最近3个点的和满足阈值;
- avg(#3): 标识最近3个点的平均值满足阈值;
// modules/judge/store/judge.go
func Judge(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) {
CheckStrategy(L, firstItem, now)
CheckExpression(L, firstItem, now)
}
先判定该条数据是否有关联的告警策略:也可能关联多个告警策略
// modules/judge/store/judge.go
func CheckStrategy(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) {
key := fmt.Sprintf("%s/%s", firstItem.Endpoint, firstItem.Metric)
strategyMap := g.StrategyMap.Get()
strategies, exists := strategyMap[key]
if !exists {
return
}
for _, s := range strategies {
...
judgeItemWithStrategy(L, s, firstItem, now)
}
}
判定时,先解析判定函数fn;再看是否满足判定要求的点数,若点数不够则直接返回;最后根据fn.Compute()的结果,决定是否将告警事件发送redis:
// modules/judge/store/judge.go
func judgeItemWithStrategy(L *SafeLinkedList, strategy model.Strategy, firstItem *model.JudgeItem, now int64) {
fn, err := ParseFuncFromString(strategy.Func, strategy.Operator, strategy.RightValue)
historyData, leftValue, isTriggered, isEnough := fn.Compute(L)
// 当前的数据点太少,不足以做告警判定
if !isEnough {
return
}
// 触发阈值,产生告警事件
event := &model.Event{
Id:fmt.Sprintf("s_%d_%s", strategy.Id, firstItem.PrimaryKey()),
Strategy:&strategy,
Endpoint:firstItem.Endpoint,
LeftValue:leftValue,
EventTime:firstItem.Timestamp,
PushedTags: firstItem.Tags,
}
sendEventIfNeed(historyData, isTriggered, now, event, strategy.MaxStep)
}
告警事件是否发送redis,跟上次的事件相关,跟最大告警次数也相关:
- 若本次触发触发阈值:
- 若上次未触发或上次是OK,那么本次产生告警事件;
- 若已超过最大报警次数,则返回;
- 最后,将本次事件的告警次数+1,发送告警事件;
- 若本次未触发阈值:
- 若上次产生了Problem事件,则将其恢复,产生恢复的告警事件;
// modules/judge/store/judge.go
func sendEventIfNeed(historyData []*model.HistoryData, isTriggered bool, now int64, event *model.Event, maxStep int) {
lastEvent, exists := g.LastEvents.Get(event.Id)
if isTriggered {
event.Status = "PROBLEM"
// 上次未触发或者上次是Ok,那么本次产生告警事件
if !exists || lastEvent.Status[0] == 'O' {
event.CurrentStep = 1
sendEvent(event)
return
}
// 已超过最大报警次数
if lastEvent.CurrentStep >= maxStep {
return
}
event.CurrentStep = lastEvent.CurrentStep + 1
sendEvent(event)
} else {
// 本次未触发,如果lastEvent是Problem,则本次将其恢复
if exists && lastEvent.Status[0] == 'P' {
event.Status = "OK"
event.CurrentStep = 1
sendEvent(event)
}
}
}
3. 告警事件写入redis 【Open-falcon judge源码分析】告警事件通过lpush命令写入redis的队列,不同的告警等级写入不同的队列,其队列名称=event:p{level},告警事件最终被alarm组件消费:
// modules/judge/store/judge.go
func sendEvent(event *model.Event) {
// update last event
g.LastEvents.Set(event.Id, event)bs, err := json.Marshal(event)
// "event:p%v"
redisKey := fmt.Sprintf(g.Config().Alarm.QueuePattern, event.Priority())
rc := g.RedisConnPool.Get()
defer rc.Close()
rc.Do("LPUSH", redisKey, string(bs))
}
推荐阅读
- Android事件传递源码分析
- Quartz|Quartz 源码解析(四) —— QuartzScheduler和Listener事件监听
- [源码解析]|[源码解析] NVIDIA HugeCTR,GPU版本参数服务器---(3)
- ffmpeg源码分析01(结构体)
- Java程序员阅读源码的小技巧,原来大牛都是这样读的,赶紧看看!
- Vue源码分析—响应式原理(二)
- SwiftUI|SwiftUI iOS 瀑布流组件之仿CollectionView不规则图文混合(教程含源码)
- java|java b2b2c shop 多用户商城系统源码- config 修改配置
- Spring源码解析_属性赋值
- Android下的IO库-Okio源码解析(一)|Android下的IO库-Okio源码解析(一) 入门