alertmanager 源码分析三: 流水线
前篇说到告警写入后被分发到dispatcher
的aggrGroupsPerRoute
中的aggrGroup
里,然后每个aggrGroup
会启动一个自己的goroutine
按照group_wait
和group_interval
两种频率来定时调用dispatcher.stage.Exec
方法来处理告警,实际上dispatcher.stage
中存储的就是由多种处理函数编排成的一个告警处理流水线,也就是架构图中的下面这部分:
pipeline
的构建是在main
函数中创建dispatcher
的时候,很容易找到,这里不赘述了,我们看看 pipeline 是怎样定义自己的 Exec 方法的,
// pipeline 就是 RoutingStage 类型, // 它是基于 ctx 中的 receiver 进入这个 receiver 的 Stage type RoutingStage map[string]Stage // 看看流水线构建函数是如何为每个 receiver 配置 Stage 的 func (pb *PipelineBuilder) New( receivers map[string][]Integration, wait func() time.Duration, inhibitor *inhibit.Inhibitor, silencer *silence.Silencer, muteTimes map[string][]timeinterval.TimeInterval, notificationLog NotificationLog, peer Peer, ) RoutingStage { rs := make(RoutingStage, len(receivers)) ms := NewGossipSettleStage(peer) is := NewMuteStage(inhibitor) ss := NewMuteStage(silencer) tms := NewTimeMuteStage(muteTimes) // 基于 receiver 的 name 编排了一个包含多个 Stage 对象的 MultiStage for name := range receivers { st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics) rs[name] = MultiStage{ms, is, tms, ss, st} } return rs } // 实现了 Exec 方法实际上是实现了 Stage 接口 // 流水线就是由各种 Stage 对象组合成的,后面再说 Stage 的设计, // 先看看 RoutingStage 做了什么 func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { // 从 context 中获取当前路由下配置的告警接收器 receiver, ok := ReceiverName(ctx) if !ok { return ctx, nil, errors.New("receiver missing") } // 从 RoutingStage 中找到对应的 MultiStage 执行 MultiStage.Exec s, ok := rs[receiver] if !ok { return ctx, nil, errors.New("stage for receiver missing") } return s.Exec(ctx, l, alerts...) } // 这个时候 aggGroup 经过 RoutingStage // 为这些 Alerts 找到了 MultiStage // 我们看看 MultiStage // MultiStage 就是个包含了多个 Stage 的数组 type MultiStage []Stage func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { var err error // 顺序执行使用数组中的每个 Stage.Exec() for _, s := range ms { if len(alerts) == 0 { return ctx, nil, nil } ctx, alerts, err = s.Exec(ctx, l, alerts...) if err != nil { return ctx, nil, err } } return ctx, alerts, nil } 复制代码
到这里总结一下,Dispatcher
下的每个aggGroup
先按照自己的receiver.Name
通过调用RoutingStage.Exec
中找到对应的MultiStage
,然后顺序调用其中的每个Stage.Exec
,接下来看下Stage
的设计:
type Stage interface { Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context context.Context, []*types.Alert, error) } // 举几个具体的 Stage 类型 type FanoutStage []Stage type GossipSettleStage struct { peer Peer } type MuteStage struct { muter types.Muter } 复制代码
Stage
这里是一个只约定了Exec
函数的接口,所以任何一个对象只要定义了相同签名的Exec
函数就是Stage
类型,你会在源码中很容找到各种Stage
,然后在对应的Exec
方法中就知道告警在当前Stage
中会被怎样处理,Exec
的入参数中alerts
表示哪些告警进入这个Stage
,然后出参中的alerts
就是经过当前Stage
处理还剩哪些告警,ctx
可以很方便各个Stage
获取当前流水线上的参数,当然也可以写入参数让后面的Stage
使用。
前面RoutingStage.Exec
和MultiStage.Exec
已经看过了我这里再找几个Stage
看看里面的具体行为:
// 负责并发的执行一些 Stage type FanoutStage []Stage func (fs FanoutStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { var ( wg sync.WaitGroup me types.MultiError ) wg.Add(len(fs)) // FanoutStage 和 MultiStage 使用的相同结构 []Stage // 但是 FanoutStage 是并发的执行 for _, s := range fs { go func(s Stage) { if _, _, err := s.Exec(ctx, l, alerts...); err != nil { me.Add(err) } wg.Done() }(s) } wg.Wait() if me.Len() > 0 { return ctx, alerts, &me } return ctx, alerts, nil } func createReceiverStage( name string, integrations []Integration, wait func() time.Duration, notificationLog NotificationLog, metrics *Metrics, ) Stage { // 这个是 FanoutStage 构建时 // 里面是多个可以并发的 MultiStage var fs FanoutStage for i := range integrations { recv := &nflogpb.Receiver{ GroupName: name, Integration: integrations[i].Name(), Idx: uint32(integrations[i].Index()), } var s MultiStage s = append(s, NewWaitStage(wait)) s = append(s, NewDedupStage(&integrations[i], notificationLog, recv)) s = append(s, NewRetryStage(integrations[i], name, metrics)) s = append(s, NewSetNotifiesStage(notificationLog, recv)) fs = append(fs, s) } return fs } 复制代码
再看看静默和抑制的Stage
type MuteStage struct { muter types.Muter } func NewMuteStage(m types.Muter) *MuteStage { return &MuteStage{muter: m} } func (n *MuteStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { var filtered []*types.Alert // 检查每个 alert 的 labelset 是否跟静默规则或者抑制规则的 labelSet 匹配 // 如果 alert 的 Labels 不匹配 Mute 就保留下来, 传给下一个 stage for _, a := range alerts { if !n.muter.Mutes(a.Labels) { filtered = append(filtered, a) } } return ctx, filtered, nil } 复制代码
MuteStage
被用来实现SilenceStage
和InhibitStage
, 它包含了一个 muter
,MuteStage.Exec
最重要的就是调用muter.Mutes
方法,那么muter
就是一个包含Mutes
方法的接口,Silencer
和Inhibitor
实现各自的 Mutes
方法就可以作为MuteStage
,那我们再看看它们各自是怎样实现Mutes
方法的:
// 这个就是 Inhibitor 实现的 Muter 接口 // 抑制功能设计是这样的: // 对于 a, b 两个 alert // 如果 a 满足 SourceMatchers // b 满足 TargetMatchers // 则 Equal 成立时, 用 a 抑制 b // Equal 成立的两个极端情况: // 1. a 和 b 都没有 Equal 中的 labels, 成立 // 2. a 和 b 都有 Equal 中的 labels, 且都为空值, 成立 // 关于抑制不生效的极端情况: // 1. a 同时满足 SourceMatchers, TargetMatchers, b 同时满足 SourceMatchers, TargetMatchers, 且 Equal 成立, 不生效 // 抑制不生效的极端情况是为了避免告警的自抑制 // 所以,告警写入阶段 Inhibitor 会通过 Sub 的方式监听新的 alert 并判断 source 侧是否匹配, // 匹配的话表示这个 alert 可能会抑制其他的 alert, 就会被缓存起来 // 在 Inhibitor 对应的 MuteStage 中取出来检查 func (ih *Inhibitor) Mutes(lset model.LabelSet) bool { fp := lset.Fingerprint() // 检查内存中所有 rule 是否匹配 lset for _, r := range ih.rules { // target 不匹配就没必要计算了 // 因为我们就是为了抑制 target if !r.TargetMatchers.Matches(lset) { continue } // target 匹配就检查 source, 如果 source 也匹配 // 那么就需要排除两端都匹配的情况 if inhibitedByFP, eq := r.hasEqual(lset, r.SourceMatchers.Matches(lset)); eq { ih.marker.SetInhibited(fp, inhibitedByFP.String()) return true } } // 这个位置没传 ids, 那么这个 alert 被置为 "active" ih.marker.SetInhibited(fp) return false } // 调用这个函数之前, 被检查 alert 已经满足了规则的 target, // 而 scache 中的 alert 已经满足了规则的 source // 剩下要确认的是: // scache 中的 alert 有没有标签和被检查 alert 标签一致的, // 再避免 alert 自我抑制的场景就可以了 func (r *InhibitRule) hasEqual(lset model.LabelSet, excludeTwoSidedMatch bool) (model.Fingerprint, bool) { Outer: for _, a := range r.scache.List() { // The cache might be stale and contain resolved alerts. if a.Resolved() { continue } // 检查规则标签 for n := range r.Equal { if a.Labels[n] != lset[n] { continue Outer } } // a 在加入 r.scache 的时候已经满足了 r.Source, 如果再通过 target 检查, 那么 scache 中的这个 a 同时满足 source 和 target // 而 excludeTwoSidedMatch 如果为 true, 表示当前 dispatcher 处理的 alert 在 source 和 target 都满足 // 所以这个条件变成了: // 如果被检查的 alert 标签还和 a 标签相同, 即抑制规则生效, 而且 a 和被检查的 alert 都同时满足 source 和 target, // 就忽略 a 对被检查 alert 的抑制, 这里防止了一个告警自己抑制自己情况 if excludeTwoSidedMatch && r.TargetMatchers.Matches(a.Labels) { continue Outer } // 出现一个抑制生效, 剩下的就不继续检查 return a.Fingerprint(), true } return model.Fingerprint(0), false } // 这个就是 Silencer 实现的 Muter 接口 func (s *Silencer) Mutes(lset model.LabelSet) bool { fp := lset.Fingerprint() activeIDs, pendingIDs, markerVersion, _ := s.marker.Silenced(fp) var ( err error allSils []*pb.Silence newVersion = markerVersion ) // 找出现在正在生效的静默规则 // 用来判断当前 alerts 哪些需要被静默掉 // version 相同表示 fp 标记时的 silences 到现在没有新的静默规则加入 if markerVersion == s.silences.Version() { totalSilences := len(activeIDs) + len(pendingIDs) if totalSilences == 0 { return false } allIDs := append(append(make([]string, 0, totalSilences), activeIDs...), pendingIDs...) allSils, _, err = s.silences.Query( // 静默规则是用户在 web 端写入 // 这个位置使用 ids 和两种状态来过滤出需要判断的静默规则 // 这个 query 的封装也很特别,我后面会在golang代码设计的文章中聊 QIDs(allIDs...), QState(types.SilenceStateActive, types.SilenceStatePending), ) } else { allSils, newVersion, err = s.silences.Query( QState(types.SilenceStateActive, types.SilenceStatePending), QMatches(lset), ) } if err != nil { level.Error(s.logger).Log("msg", "Querying silences failed, alerts might not get silenced correctly", "err", err) } if len(allSils) == 0 { s.marker.SetSilenced(fp, newVersion, nil, nil) return false } activeIDs, pendingIDs = nil, nil now := s.silences.now() // 这里仅根据搜索结果的数量就判断是否需要静默当前的 alert // 并没有计算 silence 的时间区间和当前时间是否重合, // 因为 silence 有效的计算是在 Maintenance 过程中使用 GC 来维护的 // 所以匹配的一定是现在就生效的 for _, sil := range allSils { switch getState(sil, now) { case types.SilenceStatePending: pendingIDs = append(pendingIDs, sil.Id) case types.SilenceStateActive: activeIDs = append(activeIDs, sil.Id) default: // Do nothing, silence has expired in the meantime. } } level.Debug(s.logger).Log( "msg", "determined current silences state", "now", now, "total", len(allSils), "active", len(activeIDs), "pending", len(pendingIDs), ) sort.Strings(activeIDs) sort.Strings(pendingIDs) // activeIDs 为空且没有 inhibitBy 的话, fp 仍然会是 active 的 // pendingIDs 不会对 fp 的状态有影响 s.marker.SetSilenced(fp, newVersion, activeIDs, pendingIDs) return len(activeIDs) > 0 } 复制代码
到这里,流水线的大致情况就介绍的差不多了,总结一下:
先约定
Stage
接口,再定义一些控制流程的
Stage
,比如RoutingStage
,MultiStage
,FanoutStage
等然后根据需要定义一些对
alerts
做真正处理的的Stage
,比如InhibitStage
,SilenceStage
,TimeMuteStage
等最后把这些处理
alerts
的Stage
使用流程控制的Stage
进行编排,就成了流水线
作者:cui
链接:https://juejin.cn/post/7032476509449240584