carbon-relay-ngのAggregationについてのコードリーディング

はじめに

grafana/carbon-relay-ng: Fast carbon relay+aggregator with admin interfaces for making changes online - production readyaggregator/aggregator.goaggregator/processor.go あたりのコードを読んだメモです。

今回の焦点は Sum の実装とその使われ方です。

Sum 構造体の定義

aggregator/processor.go#L282-L301

// Sum aggregates to sum
type Sum struct {
  sum float64
}

func NewSum(val float64, ts uint32) Processor {
  return &Sum{
    sum: val,
  }
}

func (s *Sum) Add(val float64, ts uint32) {
  s.sum += val
}

func (s *Sum) Flush() ([]processorResult, bool) {
  return []processorResult{
    {fcnName: "sum", val: s.sum},
  }, true
}

Processor インターフェース

aggregator/processor.go#L303-L310

type Processor interface {
  // Add adds a point to aggregate
  Add(val float64, ts uint32)
  // Flush returns the aggregated value(s) and true if it is valid
  // the only reason why it would be non-valid is for aggregators that need
  // more than 1 value but they didn't have enough to produce a useful result.
  Flush() ([]processorResult, bool)
}

Processor インターフェースの Add メソッドの呼び出し箇所

aggregator/aggregator.go#L126-L166

func (a *Aggregator) AddOrCreate(key string, ts uint32, quantized uint, value float64) {
  rangeTracker.Sample(ts)
  agg, ok := a.aggregations[quantized]
  var proc Processor
  if ok {
    proc, ok = agg.state[key]
    if ok {
      // if both levels already exist, we only need to add the value
      agg.count++
      proc.Add(value, ts)
      return
    }
  } else {
    // first level doesn't exist. create it and add the ts to the list
    // (second level will be created below)
    a.tsList = append(a.tsList, quantized)
    if len(a.tsList) > 1 && a.tsList[len(a.tsList)-2] > quantized {
      sort.Sort(TsSlice(a.tsList))
    }
    agg = &aggregation{
      state: make(map[string]Processor),
    }
    a.aggregations[quantized] = agg
  }

  // first level exists but we need to create the 2nd level.

  // note, we only flush where for a given value of now, quantized < now-wait
  // this means that as long as the clock doesn't go back in time
  // we never recreate a previously created bucket (and reflush with same key and ts)
  // a consequence of this is, that if your data stream runs consistently significantly behind
  // real time, it may never be included in aggregates, but it's up to you to configure your wait
  // parameter properly. You can use the rangeTracker and numTooOld metrics to help with this
  if quantized > uint(a.now().Unix())-a.Wait {
    agg.count++
    proc = a.procConstr(value, ts)
    agg.state[key] = proc
    return
  }
  numTooOld.Inc(1)
}

AggregatorAddOrCreaterun メソッドから呼ばれています。

aggregator/aggregator.go#L281-L351

func (a *Aggregator) run() {
  for {
    select {
    case msg := <-a.in:
      // note, we rely here on the fact that the packet has already been validated
      outKey, ok := a.matchWithCache(msg.buf[0])
      if !ok {
        continue
      }
      a.numIn.Inc(1)
      ts := uint(msg.ts)
      quantized := ts - (ts % a.Interval)
      a.AddOrCreate(outKey, msg.ts, quantized, msg.val)
    case now := <-a.tick:
      thresh := now.Add(-time.Duration(a.Wait) * time.Second)
      a.Flush(uint(thresh.Unix()))

      // if cache is enabled, clean it out of stale entries
      // it's not ideal to block our channel while flushing AND cleaning up the cache
      // ideally, these operations are interleaved in time, but we can optimize that later
      // this is a simple heuristic but should make the cache always converge on only active data (without memory leaks)
      // even though some cruft may temporarily linger a bit longer.
      // WARNING: this relies on Go's map implementation detail which randomizes iteration order, in order for us to reach
      // the entire keyspace. This may stop working properly with future go releases.  Will need to come up with smth better.
      if a.reCache != nil {
        cutoff := uint32(now.Add(-100 * time.Duration(a.Wait) * time.Second).Unix())
        a.reCacheMutex.Lock()
        for k, v := range a.reCache {
          if v.seen < cutoff {
            delete(a.reCache, k)
          } else {
            break // stop looking when we don't see old entries. we'll look again soon enough.
          }
        }
        a.reCacheMutex.Unlock()
      }
    case <-a.snapReq:
      aggsCopy := make(map[uint]*aggregation)
      for quant, aggReal := range a.aggregations {
        stateCopy := make(map[string]Processor)
        for key := range aggReal.state {
          stateCopy[key] = nil
        }
        aggsCopy[quant] = &aggregation{
          state: stateCopy,
          count: aggReal.count,
        }
      }
      s := &Aggregator{
        Fun:          a.Fun,
        procConstr:   a.procConstr,
        Matcher:      a.Matcher,
        OutFmt:       a.OutFmt,
        Cache:        a.Cache,
        Interval:     a.Interval,
        Wait:         a.Wait,
        DropRaw:      a.DropRaw,
        aggregations: aggsCopy,
        now:          time.Now,
        Key:          a.Key,
      }
      a.snapResp <- s
    case <-a.shutdown:
      thresh := a.now().Add(-time.Duration(a.Wait) * time.Second)
      a.Flush(uint(thresh.Unix()))
      a.wg.Done()
      return

    }
  }
}

Processor インターフェースの Flush メソッドの呼び出し箇所

aggregator/aggregator.go#L168-L218

// Flush finalizes and removes aggregations that are due
func (a *Aggregator) Flush(cutoff uint) {
  flushWaiting.Inc(1)
  flushes.Add()
  flushWaiting.Dec(1)
  defer flushes.Done()

  pos := -1 // will track the pos of the last ts position that was successfully processed
  for i, ts := range a.tsList {
    if ts > cutoff {
      break
    }
    agg := a.aggregations[ts]
    for key, proc := range agg.state {
      results, ok := proc.Flush()
      if ok {
        if len(results) == 1 {
          a.out <- []byte(fmt.Sprintf("%s %f %d", key, results[0].val, ts))
          a.numFlushed.Inc(1)
        } else {
          for _, result := range results {
            a.out <- []byte(fmt.Sprintf("%s.%s %f %d", key, result.fcnName, result.val, ts))
            a.numFlushed.Inc(1)
          }
        }
      }
    }
    if aggregatorReporter != nil {
      aggregatorReporter.add(a.Key, uint32(ts), agg.count)
    }
    delete(a.aggregations, ts)
    pos = i
  }
  // now we must delete all the timestamps from the ordered list
  if pos == -1 {
    // we didn't process anything, so no action needed
    return
  }
  if pos == len(a.tsList)-1 {
    // we went through all of them. can just reset the slice
    a.tsList = a.tsList[:0]
    return
  }

  // adjust the slice to only contain the timestamps that still need processing,
  // reusing the backing array
  copy(a.tsList[0:], a.tsList[pos+1:])
  a.tsList = a.tsList[:len(a.tsList)-pos-1]

  //fmt.Println("flush done for ", a.now().Unix(), ". agg size now", len(a.aggregations), a.now())
}

AggregatorFlush メソッドは run メソッド内の 2 箇所から呼ばれています。

Aggregator 構造体

aggregator/aggregator.go#L16-L53

type Aggregator struct {
  Fun          string `json:"fun"`
  procConstr   func(val float64, ts uint32) Processor
  in           chan msg    `json:"-"` // incoming metrics, already split in 3 fields
  out          chan []byte // outgoing metrics
  Matcher      matcher.Matcher
  OutFmt       string
  outFmt       []byte
  Cache        bool
  reCache      map[string]CacheEntry
  reCacheMutex sync.Mutex
  Interval     uint                  // expected interval between values in seconds, we will quantize to make sure alginment to interval-spaced timestamps
  Wait         uint                  // seconds to wait after quantized time value before flushing final outcome and ignoring future values that are sent too late.
  DropRaw      bool                  // drop raw values "consumed" by this aggregator
  tsList       []uint                // ordered list of quantized timestamps, so we can flush in correct order
  aggregations map[uint]*aggregation // aggregations in process: one for each quantized timestamp and output key, i.e. for each output metric.
  snapReq      chan bool             // chan to issue snapshot requests on
  snapResp     chan *Aggregator      // chan on which snapshot response gets sent
  shutdown     chan struct{}         // chan used internally to shut down
  wg           sync.WaitGroup        // tracks worker running state
  now          func() time.Time      // returns current time. wraps time.Now except in some unit tests
  tick         <-chan time.Time      // controls when to flush

  Key        string
  numIn      metrics.Counter
  numFlushed metrics.Counter
}

type aggregation struct {
  count uint32
  state map[string]Processor
}

type msg struct {
  buf [][]byte
  val float64
  ts  uint32
}

横道: Aggregator の matchWithCache メソッド

Aggregatorrun メソッドで呼ばれている matchWithCache メソッドも見てみます。

aggregator/aggregator.go#L252-L279

// matchWithCache returns whether there was a match, and under which key, if so.
func (a *Aggregator) matchWithCache(key []byte) (string, bool) {
  if a.reCache == nil {
    return a.Matcher.MatchRegexAndExpand(key, a.outFmt)
  }

  a.reCacheMutex.Lock()

  var outKey string
  var ok bool
  entry, ok := a.reCache[string(key)]
  if ok {
    entry.seen = uint32(a.now().Unix())
    a.reCache[string(key)] = entry
    a.reCacheMutex.Unlock()
    return entry.key, entry.match
  }

  outKey, ok = a.Matcher.MatchRegexAndExpand(key, a.outFmt)
  a.reCache[string(key)] = CacheEntry{
    ok,
    outKey,
    uint32(a.now().Unix()),
  }
  a.reCacheMutex.Unlock()

  return outKey, ok
}

ここでは reCache フィールドへのアクセスを reCacheMutex で排他制御しています。 reCache は正規表現 (Regular Expression) のキャッシュです。

Aggregator の AddOrCreate メソッドを見返す

一方、 AddOrCreate メソッドを見返すと aggregations フィールドへのアクセスは排他制御していません。これは data race にならないのでしょうか、気になるところです。

Aggregator の 1 インスタンスの run メソッドが 1 つの goroutine からのみ呼ばれるのであれば大丈夫そうな気もします。 でもそれなら reCacheMutex も不要なのではないかという話もあります。

Aggregator の run の呼び出し箇所

AggregatorNewMocked メソッドから呼ばれています。 NewMockedNew メソッドと aggregator_test.go 内のテストコードから呼ばれています。 現在時刻を time.Now() で取得するのではなく、チャンネルで受け渡すことによってテストの際に実時間と異なる時間を指定して動かすためにこのようになっています。

aggregator/aggregator.go#L55-L95

// New creates an aggregator
func New(fun string, matcher matcher.Matcher, outFmt string, cache bool, interval, wait uint, dropRaw bool, out chan []byte) (*Aggregator, error) {
  ticker := clock.AlignedTick(time.Duration(interval)*time.Second, time.Duration(wait)*time.Second, 2)
  return NewMocked(fun, matcher, outFmt, cache, interval, wait, dropRaw, out, 2000, time.Now, ticker)
}

func NewMocked(fun string, matcher matcher.Matcher, outFmt string, cache bool, interval, wait uint, dropRaw bool, out chan []byte, inBuf int, now func() time.Time, tick <-chan time.Time) (*Aggregator, error) {
  procConstr, err := GetProcessorConstructor(fun)
  if err != nil {
    return nil, err
  }

  a := &Aggregator{
    Fun:          fun,
    procConstr:   procConstr,
    in:           make(chan msg, inBuf),
    out:          out,
    Matcher:      matcher,
    OutFmt:       outFmt,
    outFmt:       []byte(outFmt),
    Cache:        cache,
    Interval:     interval,
    Wait:         wait,
    DropRaw:      dropRaw,
    aggregations: make(map[uint]*aggregation),
    snapReq:      make(chan bool),
    snapResp:     make(chan *Aggregator),
    shutdown:     make(chan struct{}),
    now:          now,
    tick:         tick,
  }
  if cache {
    a.reCache = make(map[string]CacheEntry)
  }
  a.setKey()
  a.numIn = stats.Counter("unit=Metric.direction=in.aggregator=" + a.Key)
  a.numFlushed = stats.Counter("unit=Metric.direction=out.aggregator=" + a.Key)
  a.wg.Add(1)
  go a.run()
  return a, nil
}

でこれを見ると Aggregator のインスタンスごとに 1 つの goroutine で run メソッドを実行しています。

一方で正規表現のキャッシュですが、キャッシュを使う設定の時は cachetrue になっていて reCache フィールドには上記で make で Aggregator の 1 インスタンスごとに個別の map を作っています。 reCache を参照しているのは matchWithCacherun メソッドだけですので reCacheMutex は不要そうな気がします。

git log --follow -p aggregator/aggregator.go でログを見てみると fix race condition in aggregator match cache · grafana/carbon-relay-ng@3939706reCacheMutex が追加されています。 このときは *sync.Mutex とポインターでしたが cleanup & simplify · grafana/carbon-relay-ng@51a241esync.Mutex とポインターなしの型に変更されています。

この 2 つのコミットは Cache mutex2 by Dieterbe · Pull Request #273 · grafana/carbon-relay-ng のプルリクエストに含まれています。 これともとになった fix race condition in aggregator match cache by DanCech · Pull Request #271 · grafana/carbon-relay-ng も見てみましたが、全く説明がなく、関連するイシューも無いので背景は不明でした。

うーん、なぜ reCache フィールドは排他制御が必要で aggregations フィールドは不要なのか、謎です。

2020-06-17 追記 reCache フィールドで排他制御が必要な理由が判明

AggregatormatchWithCache メソッドは run メソッド以外に AddMaybe メソッドでも呼ばれていました。

aggregator/aggregator.go#L225-L244

func (a *Aggregator) AddMaybe(buf [][]byte, val float64, ts uint32) bool {
  if !a.Matcher.PreMatch(buf[0]) {
    return false
  }

  if a.DropRaw {
    _, ok := a.matchWithCache(buf[0])
    if !ok {
      return false
    }
  }

  a.in <- msg{
    buf,
    val,
    ts,
  }

  return a.DropRaw
}

そして AddMaybe メソッドは Tabledispatch メソッドから呼ばれていました。

table/table.go#L109-L177

// Dispatch is the entrypoint to send data into the table.
// it dispatches incoming metrics into matching aggregators and routes,
// after checking against the blacklist
// buf is assumed to have no whitespace at the end
func (table *Table) Dispatch(buf []byte) {
  buf_copy := make([]byte, len(buf))
  copy(buf_copy, buf)
  log.Tracef("table received packet %s", buf_copy)

  table.numIn.Inc(1)

  conf := table.config.Load().(TableConfig)

  key, val, ts, err := m20.ValidatePacket(buf_copy, conf.Validation_level_legacy.Level, conf.Validation_level_m20.Level)
  if err != nil {
    table.bad.Add(key, buf_copy, err)
    table.numInvalid.Inc(1)
    return
  }

  if conf.Validate_order {
    err = validate.Ordered(key, ts)
    if err != nil {
      table.bad.Add(key, buf_copy, err)
      table.numOutOfOrder.Inc(1)
      return
    }
  }

  fields := bytes.Fields(buf_copy)

  for _, matcher := range conf.blacklist {
    if matcher.Match(fields[0]) {
      table.numBlacklist.Inc(1)
      log.Tracef("table dropped %s, matched blacklist entry %s", buf_copy, matcher)
      return
    }
  }

  for _, rw := range conf.rewriters {
    fields[0] = rw.Do(fields[0])
  }

  for _, aggregator := range conf.aggregators {
    // we rely on incoming metrics already having been validated
    dropRaw := aggregator.AddMaybe(fields, val, ts)
    if dropRaw {
      log.Tracef("table dropped %s, matched dropRaw aggregator %s", buf_copy, aggregator.Matcher.Regex)
      return
    }
  }

  final := bytes.Join(fields, []byte(" "))

  routed := false

  for _, route := range conf.routes {
    if route.Match(fields[0]) {
      routed = true
      log.Tracef("table sending to route: %s", final)
      route.Dispatch(final)
    }
  }

  if !routed {
    table.numUnroutable.Inc(1)
    log.Tracef("unrouteable: %s", final)
  }
}