VictoriaMetricsのクエリのコードリーディング

VictoriaMetrics/VictoriaMetrics: VictoriaMetrics - fast, cost-effective and scalable time series database, long-term remote storage for Prometheus の v1.31.2 のコードリーディングのメモ。

今回は Prometheus QL 互換のクエリ回りを見る。

メイン

メインのリクエストハンドラ。 app/victoria-metrics/main.go#L52-L63

func requestHandler(w http.ResponseWriter, r *http.Request) bool {
  if vminsert.RequestHandler(w, r) {
    return true
  }
  if vmselect.RequestHandler(w, r) {
    return true
  }
  if vmstorage.RequestHandler(w, r) {
    return true
  }
  return false
}

vmselect.RequestHandler app/vmselect/main.go#L57-L199

ラベルバリューのクエリ

Prometheus のドキュメント: Querying label values

grafana で VictoriaMetrics を Prometheus のデータソースとして登録してグラフを見た時は /api/v1/label/__name__/values というパスで呼ばれて

{"status":"success","data":["foo.bar.baz"]}

といったレスポンスが返っていました。

VictoriaMetrics の実装: app/vmselect/main.go#L83-L97

app/vmselect/prometheus パッケージの LabelValuesHandler 関数 app/vmselect/prometheus/prometheus.go#L234-L279

// LabelValuesHandler processes /api/v1/label/<labelName>/values request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values
func LabelValuesHandler(labelName string, w http.ResponseWriter, r *http.Request) error {

app/vmselect/netstorage パッケージの GetLabelValues 関数 app/vmselect/netstorage/netstorage.go#L398-L415

// GetLabelValues returns label values for the given labelName
// until the given deadline.
func GetLabelValues(labelName string, deadline Deadline) ([]string, error) {

app/vmstorage パッケージの SearchTagValues 関数 app/vmstorage/main.go#L106-L112

// SearchTagValues searches for tag values for the given tagKey
func SearchTagValues(tagKey []byte, maxTagValues int) ([]string, error) 

lib/storage パッケージの StorageSearchTagValues メソッド lib/storage/storage.go#L644-L647

// SearchTagValues searches for tag values for the given tagKey
func (s *Storage) SearchTagValues(tagKey []byte, maxTagValues int) ([]string, error) {

Storage 構造体 lib/storage/storage.go#L30-L78

// Storage represents TSDB storage.
type Storage struct {
()
  path            string
  cachePath       string
  retentionMonths int

  // lock file for exclusive access to the storage on the given path.
  flockF *os.File

  idbCurr atomic.Value

  tb *table
()
}

table 構造体 lib/storage/table.go#L16-L33

// table represents a single table with time series data.
type table struct {
  path                string
  smallPartitionsPath string
  bigPartitionsPath   string

  getDeletedMetricIDs func() *uint64set.Set

  ptws     []*partitionWrapper
  ptwsLock sync.Mutex

  flockF *os.File

  stop chan struct{}

  retentionMilliseconds int64
  retentionWatcherWG    sync.WaitGroup
}

partitionWrapper 構造体 lib/storage/table.go#L35-L46

// partitionWrapper provides refcounting mechanism for the partition.
type partitionWrapper struct {
  // Atomic counters must be at the top of struct for proper 8-byte alignment on 32-bit archs.
  // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212

  refCount uint64

  // The partition must be dropped if mustDrop > 0
  mustDrop uint64

  pt *partition
}

partition 構造体 lib/storage/partition.go#L97-L150

// partition represents a partition.
type partition struct {
()
  mergeIdx uint64

  smallPartsPath string
  bigPartsPath   string

  // The callack that returns deleted metric ids which must be skipped during merge.
  getDeletedMetricIDs func() *uint64set.Set

  // Name is the name of the partition in the form YYYY_MM.
  name string

  // The time range for the partition. Usually this is a whole month.
  tr TimeRange
()
  // Contains all the inmemoryPart plus file-based parts
  // with small number of items (up to maxRowsCountPerSmallPart).
  smallParts []*partWrapper

  // Contains file-based parts with big number of items.
  bigParts []*partWrapper

  // rawRows contains recently added rows that haven't been converted into parts yet.
  //
  // rawRows aren't used in search for performance reasons.
  rawRows rawRowsShards
()
}

partWrapper 構造体 lib/storage/partition.go#L152-L168

// partWrapper is a wrapper for the part.
type partWrapper struct {
  // Put atomic counters to the top of struct, so they are aligned to 8 bytes on 32-bit arch.
  // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212

  // The number of references to the part.
  refCount uint64

  // The part itself.
  p *part

  // non-nil if the part is inmemoryPart.
  mp *inmemoryPart

  // Whether the part is in merge now.
  isInMerge bool
}

partpartInternals 構造体 lib/storage/part.go#L31-L57


type partInternals struct {
  ph partHeader

  // Filesystem path to the part.
  //
  // Empty for in-memory part.
  path string

  // Total size in bytes of part data.
  size uint64

  timestampsFile fs.ReadAtCloser
  valuesFile     fs.ReadAtCloser
  indexFile      fs.ReadAtCloser

  metaindex []metaindexRow
}

// part represents a searchable part containing time series data.
type part struct {
  partInternals

  // Align ibCache to 8 bytes in order to align internal counters on 32-bit architectures.
  // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
  _       [(8 - (unsafe.Sizeof(partInternals{}) % 8)) % 8]byte
  ibCache indexBlockCache
}

partHeader 構造体 lib/storage/part_header.go#L11-L24

// partHeader represents part header.
type partHeader struct {
  // RowsCount is the total number of rows in the part.
  RowsCount uint64

  // BlocksCount is the total number of blocks in the part.
  BlocksCount uint64

  // MinTimestamp is the minimum timestamp in the part.
  MinTimestamp int64

  // MaxTimestamp is the maximum timestamp in the part.
  MaxTimestamp int64
}

lib/storage パッケージの indexDBSearchTagValues メソッド lib/storage/index_db.go#L784-L811

// SearchTagValues returns all the tag values for the given tagKey
func (db *indexDB) SearchTagValues(tagKey []byte, maxTagValues int) ([]string, error) {

lib/storage パッケージの indexSearchsearchTagValues メソッド lib/storage/index_db.go#L813-L857

func (is *indexSearch) searchTagValues(tvs map[string]struct{}, tagKey []byte, maxTagValues int) error {

tagKey の箇所にシークしてタグを最大 maxTagValues 個まで tvs のキーに入れる。

lib/mergeset パッケージの TableSearchSeek メソッド lib/mergeset/table_search.go#L83-L117

// Seek seeks for the first item greater or equal to k in the ts.
func (ts *TableSearch) Seek(k []byte) {

lib/mergeset パッケージの partSearchSeek メソッド lib/mergeset/part_search.go#L89-L182

// Seek seeks for the first item greater or equal to k in ps.
func (ps *partSearch) Seek(k []byte) {

関連する型

lib/mergeset パッケージの partSearch 構造体 lib/mergeset/part_search.go#L13-L49

lib/mergeset パッケージの partInternalspart 構造体 lib/mergeset/part.go#L47-L69

lib/mergeset パッケージの metaindexRow 構造体 lib/mergeset/metaindex_row.go#L12-L26

lib/storage パッケージの indexBlock 構造体 lib/storage/part.go#L158-L160

type indexBlock struct {
  bhs []blockHeader
}

lib/mergeset パッケージの blockHeader 構造体 lib/mergeset/block_header.go#L11-L35

lib/mergeset パッケージの inmemoryBlock 構造体と byteSliceSorterlib/mergeset/encoding.go#L15-L29

レンジクエリ

Prometheus のドキュメント: Range queries

grafana で VictoriaMetrics を Prometheus のデータソースとして登録してグラフを見た時は /api/v1/query_range?query=foo.bar.baz&start=1577150160&end=1577153760&step=15 というパスで呼ばれて

{"JSON":{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"foo.bar.baz","tag1":"value1","tag2":"value2"},"values":[[1577059950,"123"],[1577059965,"123"],…(略)…,[1577060700,"130"],[1577060715,"130"],…(略)…,[1577061540,"130"]]}]}}

といったレスポンスが返っていました。

VictoriaMetrics の実装: app/vmselect/main.go#L109-L117

app/vmselect/prometheus パッケージの QueryRangeHandler 関数 app/vmselect/prometheus/prometheus.go#L647-L675

app/vmselect/prometheus パッケージの queryRangeHandler 関数 app/vmselect/prometheus/prometheus.go#L677-L723

app/vmselect/promql パッケージの Exec 関数 app/vmselect/promql/exec.go#L32-L86

// Exec executes q for the given ec.
func Exec(ec *EvalConfig, q string, isFirstPointOnly bool) ([]netstorage.Result, error) {

app/vmselect/promql パッケージの evalExpr 関数 app/vmselect/promql/eval.go#L147-L271

func evalExpr(ec *EvalConfig, e expr) ([]*timeseries, error) {

app/vmselect/promql パッケージの evalRollupFunc 関数 app/vmselect/promql/eval.go#L397-L436

func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, re *rollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {

app/vmselect/promql パッケージの evalRollupFuncWithMetricExpr 関数 app/vmselect/promql/eval.go#L547-L636

func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me *metricExpr, iafc *incrementalAggrFuncContext, windowStr string) ([]*timeseries, error) {

app/vmselect/netstorage パッケージの ProcessSearchQuery 関数 app/vmselect/netstorage/netstorage.go#L468-L539

// ProcessSearchQuery performs sq on storage nodes until the given deadline.
func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline Deadline) (*Results, error) {

app/vmselect/netstorage パッケージの Result 構造体 app/vmselect/netstorage/netstorage.go#L50-L59

// Results holds results returned from ProcessSearchQuery.
type Results struct {
  tr        storage.TimeRange
  fetchData bool
  deadline  Deadline

  tbf *tmpBlocksFile

  packedTimeseries []packedTimeseries
}

packedTimeseries 構造体 app/vmselect/netstorage/netstorage.go#L157-L160

type packedTimeseries struct {
  metricName string
  addrs      []tmpBlockAddr
}

tmpBlockAddr 構造体 app/vmselect/netstorage/tmp_blocks_file.go#L74-L77

type tmpBlockAddr struct {
  offset uint64
  size   int
}

lib/storage パッケージの SearchInit メソッド lib/storage/search.go#L103-L127

// Init initializes s from the given storage, tfss and tr.
//
// MustClose must be called when the search is done.
func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, fetchData bool, maxMetrics int) {

Search 構造体 lib/storage/search.go#L79-L91

// Search is a search for time series.
type Search struct {
  // MetricBlock is updated with each Search.NextMetricBlock call.
  MetricBlock MetricBlock

  storage *Storage

  ts tableSearch

  err error

  needClosing bool
}

evalRollupWithIncrementalAggregate 関数 app/vmselect/promql/eval.go#L650-L671

func evalRollupWithIncrementalAggregate(iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig,
  preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64, removeMetricGroup bool) ([]*timeseries, error) {
  err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) {
    preFunc(rs.Values, rs.Timestamps)
    ts := getTimeseries()
    defer putTimeseries(ts)
    for _, rc := range rcs {
      ts.Reset()
      doRollupForTimeseries(rc, ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps, removeMetricGroup)
      iafc.updateTimeseries(ts, workerID)

      // ts.Timestamps points to sharedTimestamps. Zero it, so it can be re-used.
      ts.Timestamps = nil
      ts.denyReuse = false
    }
  })
  if err != nil {
    return nil, err
  }
  tss := iafc.finalizeTimeseries()
  return tss, nil
}

doRollupForTimeseries 関数 app/vmselect/promql/eval.go#L693-L705

func doRollupForTimeseries(rc *rollupConfig, tsDst *timeseries, mnSrc *storage.MetricName, valuesSrc []float64, timestampsSrc []int64,
  sharedTimestamps []int64, removeMetricGroup bool) {
  tsDst.MetricName.CopyFrom(mnSrc)
  if len(rc.TagValue) > 0 {
    tsDst.MetricName.AddTag("rollup", rc.TagValue)
  }
  if removeMetricGroup {
    tsDst.MetricName.ResetMetricGroup()
  }
  tsDst.Values = rc.Do(tsDst.Values[:0], valuesSrc, timestampsSrc)
  tsDst.Timestamps = sharedTimestamps
  tsDst.denyReuse = true
}

rollupConfig 構造体と rollupFuncapp/vmselect/promql/rollup.go#L133-L159

// rollupFunc must return rollup value for the given rfa.
//
// prevValue may be nan, values and timestamps may be empty.
type rollupFunc func(rfa *rollupFuncArg) float64

type rollupConfig struct {
  // This tag value must be added to "rollup" tag if non-empty.
  TagValue string

  Func   rollupFunc
  Start  int64
  End    int64
  Step   int64
  Window int64

  // Whether window may be adjusted to 2 x interval between data points.
  // This is needed for functions which have dt in the denominator
  // such as rate, deriv, etc.
  // Without the adjustement their value would jump in unexpected directions
  // when using window smaller than 2 x scrape_interval.
  MayAdjustWindow bool

  Timestamps []int64

  // LoookbackDelta is the analog to `-query.lookback-delta` from Prometheus world.
  LookbackDelta int64
}

lib/storage パッケージの tableSearchInit メソッド lib/storage/table_search.go#L55-L120

// Init initializes the ts.
//
// tsids must be sorted.
// tsids cannot be modified after the Init call, since it is owned by ts.
//
// MustClose must be called then the tableSearch is done.
func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange, fetchData bool) {

今回はこのへんで。