阅读 203

alertmanager 源码分析三: 流水线

前篇说到告警写入后被分发到dispatcheraggrGroupsPerRoute中的aggrGroup里,然后每个aggrGroup会启动一个自己的goroutine按照group_waitgroup_interval两种频率来定时调用dispatcher.stage.Exec方法来处理告警,实际上dispatcher.stage中存储的就是由多种处理函数编排成的一个告警处理流水线,也就是架构图中的下面这部分:

截屏2021-11-19 17.07.37.png

pipeline的构建是在main函数中创建dispatcher的时候,很容易找到,这里不赘述了,我们看看 pipeline 是怎样定义自己的 Exec 方法的,

// pipeline 就是 RoutingStage 类型, // 它是基于 ctx 中的 receiver 进入这个 receiver 的 Stage type RoutingStage map[string]Stage // 看看流水线构建函数是如何为每个 receiver 配置 Stage 的 func (pb *PipelineBuilder) New(    receivers map[string][]Integration,    wait func() time.Duration,    inhibitor *inhibit.Inhibitor,    silencer *silence.Silencer,    muteTimes map[string][]timeinterval.TimeInterval,    notificationLog NotificationLog,    peer Peer, ) RoutingStage {    rs := make(RoutingStage, len(receivers))    ms := NewGossipSettleStage(peer)    is := NewMuteStage(inhibitor)    ss := NewMuteStage(silencer)    tms := NewTimeMuteStage(muteTimes)    // 基于 receiver 的 name 编排了一个包含多个 Stage 对象的 MultiStage    for name := range receivers {       st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics)       rs[name] = MultiStage{ms, is, tms, ss, st}    }    return rs } // 实现了 Exec 方法实际上是实现了 Stage 接口 // 流水线就是由各种 Stage 对象组合成的,后面再说 Stage 的设计, // 先看看 RoutingStage 做了什么 func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {    // 从 context 中获取当前路由下配置的告警接收器    receiver, ok := ReceiverName(ctx)    if !ok {       return ctx, nil, errors.New("receiver missing")    }    // 从 RoutingStage 中找到对应的 MultiStage 执行 MultiStage.Exec    s, ok := rs[receiver]    if !ok {       return ctx, nil, errors.New("stage for receiver missing")    }    return s.Exec(ctx, l, alerts...) } // 这个时候 aggGroup 经过 RoutingStage  // 为这些 Alerts 找到了 MultiStage // 我们看看 MultiStage // MultiStage 就是个包含了多个 Stage 的数组 type MultiStage []Stage func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {    var err error    // 顺序执行使用数组中的每个 Stage.Exec()    for _, s := range ms {       if len(alerts) == 0 {          return ctx, nil, nil       }       ctx, alerts, err = s.Exec(ctx, l, alerts...)       if err != nil {          return ctx, nil, err       }    }    return ctx, alerts, nil } 复制代码

到这里总结一下,Dispatcher下的每个aggGroup先按照自己的receiver.Name通过调用RoutingStage.Exec中找到对应的MultiStage,然后顺序调用其中的每个Stage.Exec,接下来看下Stage的设计:

type Stage interface {    Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context context.Context, []*types.Alert, error) } // 举几个具体的 Stage 类型 type FanoutStage []Stage type GossipSettleStage struct { peer Peer } type MuteStage struct { muter types.Muter } 复制代码

Stage这里是一个只约定了Exec函数的接口,所以任何一个对象只要定义了相同签名的Exec函数就是Stage类型,你会在源码中很容找到各种Stage,然后在对应的Exec方法中就知道告警在当前Stage中会被怎样处理,Exec的入参数中alerts表示哪些告警进入这个Stage,然后出参中的alerts就是经过当前Stage处理还剩哪些告警,ctx可以很方便各个Stage获取当前流水线上的参数,当然也可以写入参数让后面的Stage使用。

前面RoutingStage.ExecMultiStage.Exec已经看过了我这里再找几个Stage看看里面的具体行为:

// 负责并发的执行一些 Stage type FanoutStage []Stage func (fs FanoutStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {    var (       wg sync.WaitGroup       me types.MultiError    )    wg.Add(len(fs))    // FanoutStage 和 MultiStage 使用的相同结构 []Stage    // 但是 FanoutStage 是并发的执行    for _, s := range fs {       go func(s Stage) {          if _, _, err := s.Exec(ctx, l, alerts...); err != nil {             me.Add(err)          }          wg.Done()       }(s)    }    wg.Wait()    if me.Len() > 0 {       return ctx, alerts, &me    }    return ctx, alerts, nil } func createReceiverStage(    name string,    integrations []Integration,    wait func() time.Duration,    notificationLog NotificationLog,    metrics *Metrics, ) Stage {    // 这个是 FanoutStage 构建时    // 里面是多个可以并发的 MultiStage    var fs FanoutStage    for i := range integrations {       recv := &nflogpb.Receiver{          GroupName:   name,          Integration: integrations[i].Name(),          Idx:         uint32(integrations[i].Index()),       }       var s MultiStage       s = append(s, NewWaitStage(wait))       s = append(s, NewDedupStage(&integrations[i], notificationLog, recv))       s = append(s, NewRetryStage(integrations[i], name, metrics))       s = append(s, NewSetNotifiesStage(notificationLog, recv))       fs = append(fs, s)    }    return fs } 复制代码

再看看静默和抑制的Stage

type MuteStage struct {    muter types.Muter } func NewMuteStage(m types.Muter) *MuteStage {    return &MuteStage{muter: m} } func (n *MuteStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {    var filtered []*types.Alert    // 检查每个 alert 的 labelset 是否跟静默规则或者抑制规则的 labelSet 匹配    // 如果 alert 的 Labels 不匹配 Mute 就保留下来, 传给下一个 stage    for _, a := range alerts {       if !n.muter.Mutes(a.Labels) {          filtered = append(filtered, a)       }    }    return ctx, filtered, nil } 复制代码

MuteStage被用来实现SilenceStageInhibitStage, 它包含了一个 muterMuteStage.Exec最重要的就是调用muter.Mutes方法,那么muter就是一个包含Mutes方法的接口,SilencerInhibitor实现各自的 Mutes方法就可以作为MuteStage,那我们再看看它们各自是怎样实现Mutes方法的:

// 这个就是 Inhibitor 实现的 Muter 接口 // 抑制功能设计是这样的: // 对于 a, b 两个 alert // 如果 a 满足 SourceMatchers // b 满足 TargetMatchers // 则 Equal 成立时, 用 a 抑制 b // Equal 成立的两个极端情况: //    1. a 和 b 都没有 Equal 中的 labels, 成立 //    2. a 和 b 都有 Equal 中的 labels, 且都为空值, 成立 // 关于抑制不生效的极端情况: //    1. a 同时满足 SourceMatchers, TargetMatchers, b 同时满足 SourceMatchers, TargetMatchers, 且 Equal 成立, 不生效 // 抑制不生效的极端情况是为了避免告警的自抑制 // 所以,告警写入阶段 Inhibitor 会通过 Sub 的方式监听新的 alert 并判断 source 侧是否匹配,  // 匹配的话表示这个 alert 可能会抑制其他的 alert, 就会被缓存起来 // 在 Inhibitor 对应的 MuteStage 中取出来检查 func (ih *Inhibitor) Mutes(lset model.LabelSet) bool {    fp := lset.Fingerprint()    // 检查内存中所有 rule 是否匹配 lset    for _, r := range ih.rules {       // target 不匹配就没必要计算了       // 因为我们就是为了抑制 target       if !r.TargetMatchers.Matches(lset) {          continue       }       // target 匹配就检查 source, 如果 source 也匹配       // 那么就需要排除两端都匹配的情况       if inhibitedByFP, eq := r.hasEqual(lset, r.SourceMatchers.Matches(lset)); eq {          ih.marker.SetInhibited(fp, inhibitedByFP.String())          return true       }    }    // 这个位置没传 ids, 那么这个 alert 被置为 "active"    ih.marker.SetInhibited(fp)    return false } // 调用这个函数之前, 被检查 alert 已经满足了规则的 target, // 而 scache 中的 alert 已经满足了规则的 source // 剩下要确认的是: //   scache 中的 alert 有没有标签和被检查 alert 标签一致的, //   再避免 alert 自我抑制的场景就可以了 func (r *InhibitRule) hasEqual(lset model.LabelSet, excludeTwoSidedMatch bool) (model.Fingerprint, bool) { Outer:    for _, a := range r.scache.List() {       // The cache might be stale and contain resolved alerts.       if a.Resolved() {          continue       }       // 检查规则标签       for n := range r.Equal {          if a.Labels[n] != lset[n] {             continue Outer          }       }       // a 在加入 r.scache 的时候已经满足了 r.Source, 如果再通过 target 检查, 那么 scache 中的这个 a 同时满足 source 和 target       // 而 excludeTwoSidedMatch 如果为 true, 表示当前 dispatcher 处理的 alert 在 source 和 target 都满足       // 所以这个条件变成了:       // 如果被检查的 alert 标签还和 a 标签相同, 即抑制规则生效, 而且 a 和被检查的 alert 都同时满足 source 和 target,       // 就忽略 a 对被检查 alert 的抑制, 这里防止了一个告警自己抑制自己情况       if excludeTwoSidedMatch && r.TargetMatchers.Matches(a.Labels) {          continue Outer       }       // 出现一个抑制生效, 剩下的就不继续检查       return a.Fingerprint(), true    }    return model.Fingerprint(0), false } // 这个就是 Silencer 实现的 Muter 接口 func (s *Silencer) Mutes(lset model.LabelSet) bool {    fp := lset.Fingerprint()    activeIDs, pendingIDs, markerVersion, _ := s.marker.Silenced(fp)    var (       err        error       allSils    []*pb.Silence       newVersion = markerVersion    )    // 找出现在正在生效的静默规则    // 用来判断当前 alerts 哪些需要被静默掉    // version 相同表示 fp 标记时的 silences 到现在没有新的静默规则加入    if markerVersion == s.silences.Version() {       totalSilences := len(activeIDs) + len(pendingIDs)       if totalSilences == 0 {          return false       }       allIDs := append(append(make([]string, 0, totalSilences), activeIDs...), pendingIDs...)       allSils, _, err = s.silences.Query(          // 静默规则是用户在 web 端写入          // 这个位置使用 ids 和两种状态来过滤出需要判断的静默规则          // 这个 query 的封装也很特别,我后面会在golang代码设计的文章中聊          QIDs(allIDs...),          QState(types.SilenceStateActive, types.SilenceStatePending),       )    } else {       allSils, newVersion, err = s.silences.Query(          QState(types.SilenceStateActive, types.SilenceStatePending),          QMatches(lset),       )    }    if err != nil {       level.Error(s.logger).Log("msg", "Querying silences failed, alerts might not get silenced correctly", "err", err)    }    if len(allSils) == 0 {       s.marker.SetSilenced(fp, newVersion, nil, nil)       return false    }    activeIDs, pendingIDs = nil, nil    now := s.silences.now()    // 这里仅根据搜索结果的数量就判断是否需要静默当前的 alert    // 并没有计算 silence 的时间区间和当前时间是否重合,    // 因为 silence 有效的计算是在 Maintenance 过程中使用 GC 来维护的    // 所以匹配的一定是现在就生效的    for _, sil := range allSils {       switch getState(sil, now) {       case types.SilenceStatePending:          pendingIDs = append(pendingIDs, sil.Id)       case types.SilenceStateActive:          activeIDs = append(activeIDs, sil.Id)       default:          // Do nothing, silence has expired in the meantime.       }    }    level.Debug(s.logger).Log(       "msg", "determined current silences state",       "now", now,       "total", len(allSils),       "active", len(activeIDs),       "pending", len(pendingIDs),    )    sort.Strings(activeIDs)    sort.Strings(pendingIDs)    // activeIDs 为空且没有 inhibitBy 的话, fp 仍然会是 active 的    // pendingIDs 不会对 fp 的状态有影响    s.marker.SetSilenced(fp, newVersion, activeIDs, pendingIDs)    return len(activeIDs) > 0 } 复制代码

到这里,流水线的大致情况就介绍的差不多了,总结一下:

  1. 先约定Stage接口,

  2. 再定义一些控制流程的Stage,比如RoutingStageMultiStageFanoutStage

  3. 然后根据需要定义一些对alerts做真正处理的的Stage,比如InhibitStageSilenceStageTimeMuteStage

  4. 最后把这些处理alertsStage使用流程控制的Stage进行编排,就成了流水线


作者:cui
链接:https://juejin.cn/post/7032476509449240584

文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐