VictoriaMetrics/fastcacheのコードリーディングその2

はじめに

VictoriMetrics/fastcacheのコードリーディングその1 の後、新しいコミットが入っていたので今回の対象は 2dd9480 です。

今回はファイルへの書き出しとファイルからの読み込みが対象です。ファイルは file.go です。

コードリーディング

SaveToFile 関数

file.go#L16-L26

// SaveToFile atomically saves cache data to the given filePath using a single
// CPU core.
//
// SaveToFile may be called concurrently with other operations on the cache.
//
// The saved data may be loaded with LoadFromFile*.
//
// See also SaveToFileConcurrent for faster saving to file.
func (c *Cache) SaveToFile(filePath string) error {
  return c.SaveToFileConcurrent(filePath, 1)
}

SaveToFileConcurrent メソッドを並列度1で呼ぶラッパメソッドです。

SaveToFileConcurrent 関数

file.go#L28-L77

// SaveToFileConcurrent saves cache data to the given filePath using concurrency
// CPU cores.
//
// SaveToFileConcurrent may be called concurrently with other operations
// on the cache.
//
// The saved data may be loaded with LoadFromFile*.
//
// See also SaveToFile.
func (c *Cache) SaveToFileConcurrent(filePath string, concurrency int) error {
  // Create dir if it doesn't exist.
  dir := filepath.Dir(filePath)
  if _, err := os.Stat(dir); err != nil {
    if !os.IsNotExist(err) {
      return fmt.Errorf("cannot stat %q: %s", dir, err)
    }
    if err := os.MkdirAll(dir, 0755); err != nil {
      return fmt.Errorf("cannot create dir %q: %s", dir, err)
    }
  }

  // Save cache data into a temporary directory.
  tmpDir, err := ioutil.TempDir(dir, "fastcache.tmp.")
  if err != nil {
    return fmt.Errorf("cannot create temporary dir inside %q: %s", dir, err)
  }
  defer func() {
    if tmpDir != "" {
      _ = os.RemoveAll(tmpDir)
    }
  }()
  gomaxprocs := runtime.GOMAXPROCS(-1)
  if concurrency <= 0 || concurrency > gomaxprocs {
    concurrency = gomaxprocs
  }
  if err := c.save(tmpDir, concurrency); err != nil {
    return fmt.Errorf("cannot save cache data to temporary dir %q: %s", tmpDir, err)
  }

  // Remove old filePath contents, since os.Rename may return
  // error if filePath dir exists.
  if err := os.RemoveAll(filePath); err != nil {
    return fmt.Errorf("cannot remove old contents at %q: %s", filePath, err)
  }
  if err := os.Rename(tmpDir, filePath); err != nil {
    return fmt.Errorf("cannot move temporary dir %q to %q: %s", tmpDir, filePath, err)
  }
  tmpDir = ""
  return nil
}

LoadFromFile 関数

file.go#L79-L84

// LoadFromFile loads cache data from the given filePath.
//
// See SaveToFile* for saving cache data to file.
func LoadFromFile(filePath string) (*Cache, error) {
  return load(filePath, 0)
}

load 関数を maxBytes を 0 で呼び出します。

LoadFromFileOrNew 関数

file.go#L86-L96

// LoadFromFileOrNew tries loading cache data from the given filePath.
//
// The function falls back to creating new cache with the given maxBytes
// capacity if error occurs during loading the cache from file.
func LoadFromFileOrNew(filePath string, maxBytes int) *Cache {
  c, err := load(filePath, maxBytes)
  if err == nil {
    return c
  }
  return New(maxBytes)
}

load でファイルから読み込んで成功したらそれを返し、失敗したら New を呼んで新規作成します。

Cache の save メソッド

file.go#L98-L126

func (c *Cache) save(dir string, workersCount int) error {
  if err := saveMetadata(c, dir); err != nil {
    return err
  }

  // Save buckets by workersCount concurrent workers.
  workCh := make(chan int, workersCount)
  results := make(chan error)
  for i := 0; i < workersCount; i++ {
    go func(workerNum int) {
      results <- saveBuckets(c.buckets[:], workCh, dir, workerNum)
    }(i)
  }
  // Feed workers with work
  for i := range c.buckets[:] {
    workCh <- i
  }
  close(workCh)

  // Read results.
  var err error
  for i := 0; i < workersCount; i++ {
    result := <-results
    if result != nil && err != nil {
      err = result
    }
  }
  return err
}

load 関数

file.go#L128-L177

func load(filePath string, maxBytes int) (*Cache, error) {
  maxBucketChunks, err := loadMetadata(filePath)
  if err != nil {
    return nil, err
  }
  if maxBytes > 0 {
    maxBucketBytes := uint64((maxBytes + bucketsCount - 1) / bucketsCount)
    expectedBucketChunks := (maxBucketBytes + chunkSize - 1) / chunkSize
    if maxBucketChunks != expectedBucketChunks {
      return nil, fmt.Errorf("cache file %s contains maxBytes=%d; want %d", filePath, maxBytes, expectedBucketChunks*chunkSize*bucketsCount)
    }
  }

  // Read bucket files from filePath dir.
  d, err := os.Open(filePath)
  if err != nil {
    return nil, fmt.Errorf("cannot open %q: %s", filePath, err)
  }
  defer func() {
    _ = d.Close()
  }()
  fis, err := d.Readdir(-1)
  if err != nil {
    return nil, fmt.Errorf("cannot read files from %q: %s", filePath, err)
  }
  results := make(chan error)
  workersCount := 0
  var c Cache
  for _, fi := range fis {
    fn := fi.Name()
    if fi.IsDir() || !dataFileRegexp.MatchString(fn) {
      continue
    }
    workersCount++
    go func(dataPath string) {
      results <- loadBuckets(c.buckets[:], dataPath, maxBucketChunks)
    }(filePath + "/" + fn)
  }
  err = nil
  for i := 0; i < workersCount; i++ {
    result := <-results
    if result != nil && err == nil {
      err = result
    }
  }
  if err != nil {
    return nil, err
  }
  return &c, nil
}

dataFileRegexp 変数は file.go#L214 で定義されています。

var dataFileRegexp = regexp.MustCompile(`^data\.\d+\.bin$`)

saveMetadata 関数

file.go#L179-L193

func saveMetadata(c *Cache, dir string) error {
  metadataPath := dir + "/metadata.bin"
  metadataFile, err := os.Create(metadataPath)
  if err != nil {
    return fmt.Errorf("cannot create %q: %s", metadataPath, err)
  }
  defer func() {
    _ = metadataFile.Close()
  }()
  maxBucketChunks := uint64(cap(c.buckets[0].chunks))
  if err := writeUint64(metadataFile, maxBucketChunks); err != nil {
    return fmt.Errorf("cannot write maxBucketChunks=%d to %q: %s", maxBucketChunks, metadataPath, err)
  }
  return nil
}

loadMetadata 関数

file.go#L195-L212

func loadMetadata(dir string) (uint64, error) {
  metadataPath := dir + "/metadata.bin"
  metadataFile, err := os.Open(metadataPath)
  if err != nil {
    return 0, fmt.Errorf("cannot open %q: %s", metadataPath, err)
  }
  defer func() {
    _ = metadataFile.Close()
  }()
  maxBucketChunks, err := readUint64(metadataFile)
  if err != nil {
    return 0, fmt.Errorf("cannot read maxBucketChunks from %q: %s", metadataPath, err)
  }
  if maxBucketChunks == 0 {
    return 0, fmt.Errorf("invalid maxBucketChunks=0 read from %q", metadataPath)
  }
  return maxBucketChunks, nil
}

saveBuckets 関数

file.go#L216-L238

func saveBuckets(buckets []bucket, workCh <-chan int, dir string, workerNum int) error {
  dataPath := fmt.Sprintf("%s/data.%d.bin", dir, workerNum)
  dataFile, err := os.Create(dataPath)
  if err != nil {
    return fmt.Errorf("cannot create %q: %s", dataPath, err)
  }
  defer func() {
    _ = dataFile.Close()
  }()
  zw := snappy.NewBufferedWriter(dataFile)
  for bucketNum := range workCh {
    if err := writeUint64(zw, uint64(bucketNum)); err != nil {
      return fmt.Errorf("cannot write bucketNum=%d to %q: %s", bucketNum, dataPath, err)
    }
    if err := buckets[bucketNum].Save(zw); err != nil {
      return fmt.Errorf("cannot save bucket[%d] to %q: %s", bucketNum, dataPath, err)
    }
  }
  if err := zw.Close(); err != nil {
    return fmt.Errorf("cannot close snappy.Writer for %q: %s", dataPath, err)
  }
  return nil
}

loadBuckets 関数

file.go#L240-L262

func loadBuckets(buckets []bucket, dataPath string, maxChunks uint64) error {
  dataFile, err := os.Open(dataPath)
  if err != nil {
    return fmt.Errorf("cannot open %q: %s", dataPath, err)
  }
  defer func() {
    _ = dataFile.Close()
  }()
  zr := snappy.NewReader(dataFile)
  for {
    bucketNum, err := readUint64(zr)
    if err == io.EOF {
      // Reached the end of file.
      return nil
    }
    if bucketNum >= uint64(len(buckets)) {
      return fmt.Errorf("unexpected bucketNum read from %q: %d; must be smaller than %d", dataPath, bucketNum, len(buckets))
    }
    if err := buckets[bucketNum].Load(zr, maxChunks); err != nil {
      return fmt.Errorf("cannot load bucket[%d] from %q: %s", bucketNum, dataPath, err)
    }
  }
}

bucket の Save メソッド

file.go#L264-L315

func (b *bucket) Save(w io.Writer) error {
  b.Clean()

  b.mu.RLock()
  defer b.mu.RUnlock()

  // Store b.idx, b.gen and b.m to w.

  bIdx := b.idx
  bGen := b.gen
  chunksLen := 0
  for _, chunk := range b.chunks {
    if chunk == nil {
      break
    }
    chunksLen++
  }
  kvs := make([]byte, 0, 2*8*len(b.m))
  var u64Buf [8]byte
  for k, v := range b.m {
    binary.LittleEndian.PutUint64(u64Buf[:], k)
    kvs = append(kvs, u64Buf[:]...)
    binary.LittleEndian.PutUint64(u64Buf[:], v)
    kvs = append(kvs, u64Buf[:]...)
  }

  if err := writeUint64(w, bIdx); err != nil {
    return fmt.Errorf("cannot write b.idx: %s", err)
  }
  if err := writeUint64(w, bGen); err != nil {
    return fmt.Errorf("cannot write b.gen: %s", err)
  }
  if err := writeUint64(w, uint64(len(kvs))/2/8); err != nil {
    return fmt.Errorf("cannot write len(b.m): %s", err)
  }
  if _, err := w.Write(kvs); err != nil {
    return fmt.Errorf("cannot write b.m: %s", err)
  }

  // Store b.chunks to w.
  if err := writeUint64(w, uint64(chunksLen)); err != nil {
    return fmt.Errorf("cannot write len(b.chunks): %s", err)
  }
  for chunkIdx := 0; chunkIdx < chunksLen; chunkIdx++ {
    chunk := b.chunks[chunkIdx][:chunkSize]
    if _, err := w.Write(chunk); err != nil {
      return fmt.Errorf("cannot write b.chunks[%d]: %s", chunkIdx, err)
    }
  }

  return nil
}

bucket の Load メソッド

file.go#L317-L393

func (b *bucket) Load(r io.Reader, maxChunks uint64) error {
  if maxChunks == 0 {
    return fmt.Errorf("the number of chunks per bucket cannot be zero")
  }
  bIdx, err := readUint64(r)
  if err != nil {
    return fmt.Errorf("cannot read b.idx: %s", err)
  }
  bGen, err := readUint64(r)
  if err != nil {
    return fmt.Errorf("cannot read b.gen: %s", err)
  }
  kvsLen, err := readUint64(r)
  if err != nil {
    return fmt.Errorf("cannot read len(b.m): %s", err)
  }
  kvsLen *= 2 * 8
  kvs := make([]byte, kvsLen)
  if _, err := io.ReadFull(r, kvs); err != nil {
    return fmt.Errorf("cannot read b.m: %s", err)
  }
  m := make(map[uint64]uint64, kvsLen/2/8)
  for len(kvs) > 0 {
    k := binary.LittleEndian.Uint64(kvs)
    kvs = kvs[8:]
    v := binary.LittleEndian.Uint64(kvs)
    kvs = kvs[8:]
    m[k] = v
  }

  maxBytes := maxChunks * chunkSize
  if maxBytes >= maxBucketSize {
    return fmt.Errorf("too big maxBytes=%d; should be smaller than %d", maxBytes, maxBucketSize)
  }
  chunks := make([][]byte, maxChunks)
  chunksLen, err := readUint64(r)
  if err != nil {
    return fmt.Errorf("cannot read len(b.chunks): %s", err)
  }
  if chunksLen > uint64(maxChunks) {
    return fmt.Errorf("chunksLen=%d cannot exceed maxChunks=%d", chunksLen, maxChunks)
  }
  currChunkIdx := bIdx / chunkSize
  if currChunkIdx > 0 && currChunkIdx >= chunksLen {
    return fmt.Errorf("too big bIdx=%d; should be smaller than %d", bIdx, chunksLen*chunkSize)
  }
  for chunkIdx := uint64(0); chunkIdx < chunksLen; chunkIdx++ {
    chunk := getChunk()
    chunks[chunkIdx] = chunk
    if _, err := io.ReadFull(r, chunk); err != nil {
      // Free up allocated chunks before returning the error.
      for _, chunk := range chunks {
        if chunk != nil {
          putChunk(chunk)
        }
      }
      return fmt.Errorf("cannot read b.chunks[%d]: %s", chunkIdx, err)
    }
  }
  // Adjust len for the chunk pointed by currChunkIdx.
  if chunksLen > 0 {
    chunkLen := bIdx % chunkSize
    chunks[currChunkIdx] = chunks[currChunkIdx][:chunkLen]
  }

  b.mu.Lock()
  for _, chunk := range b.chunks {
    putChunk(chunk)
  }
  b.chunks = chunks
  b.m = m
  b.idx = bIdx
  b.gen = bGen
  b.mu.Unlock()

  return nil
}

writeUint64 関数

file.go#L395-L400

func writeUint64(w io.Writer, u uint64) error {
  var u64Buf [8]byte
  binary.LittleEndian.PutUint64(u64Buf[:], u)
  _, err := w.Write(u64Buf[:])
  return err
}

readUint64

file.go#L402-L409

func readUint64(r io.Reader) (uint64, error) {
  var u64Buf [8]byte
  if _, err := io.ReadFull(r, u64Buf[:]); err != nil {
    return 0, err
  }
  u := binary.LittleEndian.Uint64(u64Buf[:])
  return u, nil
}

まとめ

VictoriMetrics/fastcacheのコードリーディングその1 と合わせて主な処理は読み終わりました。

ハッシュ衝突の際に黙って上書きという割り切り仕様が高速化に寄与しているんだろうなと思いますが、実際に自分が使うことを考えると不安が残ります。

全般としては mmap で割り当てたメモリ領域を使ってデータ構造を構築する見本として非常に勉強になりました。