Litestreamのコードリーディングその2

はじめに

対象バージョン https://github.com/benbjohnson/litestream/tree/e6f7c6052d84b7265fd54d3a3ab33208948e126b

前回: Litestreamのコードリーディング

今回は upstream の URL を指定した場合の挙動関連。

DB 構造体

// DB represents a managed instance of a SQLite database in the file system.

type DB struct

DB 構造体の StreamClient のコメント

	// Client used to receive live, upstream changes. If specified, then
	// DB should be used as read-only as local changes will conflict with
	// upstream changes.
	StreamClient StreamClient

StreamClient インタフェース

// StreamClient represents a client for streaming changes to a replica DB.
type StreamClient interface {
	// Stream returns a reader which contains and optional snapshot followed
	// by a series of WAL segments. This stream begins from the given position.
	Stream(ctx context.Context, pos Pos) (StreamReader, error)
}

db.StreamSlient を設定しているのは1箇所

cmd/litestream/main.go#L318

db.StreamClient = http.NewClient(upstreamURL, upstreamPath)

// NewDBFromConfigWithPath instantiates a DB based on a configuration and using a given path.

func NewDBFromConfigWithPath(dbc *DBConfig, path string) (*litestream.DB, error) 関数内

NewDBFromConfigWithPath 関数を呼ぶのは2箇所

$ vgrep NewDBFromConfigWithPath
Index File                        Line Content
    0 cmd/litestream/main.go       303 return NewDBFromConfigWithPath(dbc, path)
    1 cmd/litestream/main.go       306 // NewDBFromConfigWithPath instantiates a DB based on a configuration and using a given path.
    2 cmd/litestream/main.go       307 func NewDBFromConfigWithPath(dbc *DBConfig, path string) (*litestream.DB, error) {
    3 cmd/litestream/replicate.go  121 return NewDBFromConfigWithPath(dbConfig, path)

db.StreamSlient の Stream メソッドの呼び出し元

$ vgrep db.StreamClient
Index File                   Line Content
    0 cmd/litestream/main.go  318 db.StreamClient = http.NewClient(upstreamURL, upstreamPath)
    1 db.go                   930 if db.StreamClient != nil {
    2 db.go                  1649 if db.StreamClient != nil {
    3 db.go                  1716 sr, err := db.StreamClient.Stream(ctx, pos)

// Continuously stream and apply records from client.

sr, err := db.StreamClient.Stream(ctx, pos)

// stream initializes the local database and continuously streams new upstream data.

func (db *DB) stream(ctx context.Context) error メソッド内

// monitorUpstream runs in a separate goroutine and streams data into the local DB.

func (db *DB) monitorUpstream(ctx context.Context) error

func (db *DB) monitor(ctx context.Context) error

	// If an upstream client is specified, then we should simply stream changes
	// into the database. If it is not specified, then we should monitor the
	// database for local changes and replicate them out.
	db.g.Go(func() error { return db.monitor(db.ctx) })

db.go#L428

// Open initializes the background monitoring goroutine.

func (db *DB) Open() (err error)

	// Start watching the database for changes.
	if err := db.Open(); err != nil {

server.go#L106-L107

// Watch adds a database path to be managed by the server.

func (s *Server) Watch(path string, fn func(path string) (*DB, error)) error

		if err := c.server.Watch(path, func(path string) (*litestream.DB, error) {
			return NewDBFromConfigWithPath(dbConfig, path)
		}); err != nil {

cmd/litestream/replicate.go#L120-L122

// Run loads all databases specified in the configuration.

func (c *ReplicateCommand) Run(ctx context.Context) (err error)

db.StreamSlient の stream メソッドの処理を追ってみる

// stream initializes the local database and continuously streams new upstream data.

func (db *DB) stream(ctx context.Context) error

// stream initializes the local database and continuously streams new upstream data.
func (db *DB) stream(ctx context.Context) error {
	pos, err := db.readPositionFile()
	if err != nil {
		return fmt.Errorf("read position file: %w", err)
	}

	// Continuously stream and apply records from client.
	sr, err := db.StreamClient.Stream(ctx, pos)
	if err != nil {
		return fmt.Errorf("stream connect: %w", err)
	}
	defer sr.Close()

	// Initialize the database and create it if it doesn't exist.
	if err := db.initReplica(sr.PageSize()); err != nil {
		return fmt.Errorf("init replica: %w", err)
	}

	for {
		hdr, err := sr.Next()
		if err != nil {
			return err
		}

		switch hdr.Type {
		case StreamRecordTypeSnapshot:
			if err := db.streamSnapshot(ctx, hdr, sr); err != nil {
				return fmt.Errorf("snapshot: %w", err)
			}
		case StreamRecordTypeWALSegment:
			if err := db.streamWALSegment(ctx, hdr, sr); err != nil {
				return fmt.Errorf("wal segment: %w", err)
			}
		default:
			return fmt.Errorf("invalid stream record type: 0x%02x", hdr.Type)
		}
	}
}

db.streamSnapshot メソッド

// streamSnapshot reads the snapshot into the WAL and applies it to the main database.

func (db *DB) streamSnapshot(ctx context.Context, hdr *StreamRecordHeader, r io.Reader) error

db.streamWALSegment メソッド

// streamWALSegment rewrites a WAL segment into the local WAL and applies it to the main database.

func (db *DB) streamWALSegment(ctx context.Context, hdr *StreamRecordHeader, r io.Reader) error

invalidateSHMFile 関数

// invalidateSHMFile clears the iVersion field of the -shm file in order that
// the next transaction will rebuild it.

func invalidateSHMFile(dbPath string) error

DB.writePositionFile メソッド

// writePositionFile writes pos as the current position.

func (db *DB) writePositionFile(pos Pos) error

(db *DB) PositionPath() string

// PositionPath returns the path of the file that stores the current position.
// This file is only used to communicate state to external processes.
func (db *DB) PositionPath() string {
	return filepath.Join(db.MetaPath(), "position")
}

ファイル構成

リプリケーション元のディレクトリ・ファイル構成

DBファイル、共有メモリファイル、WALファイル

$ LC_ALL=C ls -lR source.db{,-shm,-wal}
-rw-r--r-- 1 hnakamur hnakamur 16384 May 12 17:05 source.db
-rw-r--r-- 1 hnakamur hnakamur 32768 May 12 17:05 source.db-shm
-rw-r--r-- 1 hnakamur hnakamur 37112 May 12 17:05 source.db-wal

func (db *DB) MetaPath() string はリプリケーション元の DB ファイル名に -litestream を加えた名前のディレクトリ

その配下のディレクトリ・ファイル構成の例

$ LC_ALL=C ls -lR source.db-litestream/
source.db-litestream/:
total 3
-rw-r--r-- 1 hnakamur hnakamur 17 May 12 17:04 generation
drwxrwxr-x 3 hnakamur hnakamur  3 May 12 17:04 generations
-rw-r--r-- 1 hnakamur hnakamur 51 May 12 17:05 position

source.db-litestream/generations:
total 1
drwxrwxr-x 3 hnakamur hnakamur 3 May 12 17:04 40e9bff6b361ab2f

source.db-litestream/generations/40e9bff6b361ab2f:
total 1
drwxrwxr-x 4 hnakamur hnakamur 4 May 12 17:05 wal

source.db-litestream/generations/40e9bff6b361ab2f/wal:
total 10
drwxrwxr-x 2 hnakamur hnakamur 9 May 12 17:05 0000000000000000
drwxrwxr-x 2 hnakamur hnakamur 4 May 12 17:05 0000000000000001

source.db-litestream/generations/40e9bff6b361ab2f/wal/0000000000000000:
total 19
-rw-r--r-- 1 hnakamur hnakamur  51 May 12 17:04 0000000000000000.wal.lz4
-rw-r--r-- 1 hnakamur hnakamur 444 May 12 17:04 0000000000000020.wal.lz4
-rw-r--r-- 1 hnakamur hnakamur  91 May 12 17:05 0000000000004080.wal.lz4
-rw-r--r-- 1 hnakamur hnakamur  97 May 12 17:05 0000000000005098.wal.lz4
-rw-r--r-- 1 hnakamur hnakamur 102 May 12 17:05 00000000000060b0.wal.lz4
-rw-r--r-- 1 hnakamur hnakamur 111 May 12 17:05 00000000000070c8.wal.lz4
-rw-r--r-- 1 hnakamur hnakamur 122 May 12 17:05 00000000000080e0.wal.lz4

source.db-litestream/generations/40e9bff6b361ab2f/wal/0000000000000001:
total 2
-rw-r--r-- 1 hnakamur hnakamur 51 May 12 17:05 0000000000000000.wal.lz4
-rw-r--r-- 1 hnakamur hnakamur 91 May 12 17:05 0000000000000020.wal.lz4

リプリケーション先のディレクトリ・ファイル構成

$ LC_ALL=C ls -lR destination.db/
destination.db/:
total 1
drwxrwxr-x 3 hnakamur hnakamur 3 May 12 17:04 generations

destination.db/generations:
total 1
drwxrwxr-x 4 hnakamur hnakamur 4 May 12 17:04 40e9bff6b361ab2f

destination.db/generations/40e9bff6b361ab2f:
total 2
drwxrwxr-x 2 hnakamur hnakamur 3 May 12 17:04 snapshots
drwxrwxr-x 4 hnakamur hnakamur 4 May 12 17:05 wal

destination.db/generations/40e9bff6b361ab2f/snapshots:
total 5
-rw-r--r-- 1 hnakamur hnakamur 348 May 12 17:04 0000000000000000.snapshot.lz4

destination.db/generations/40e9bff6b361ab2f/wal:
total 10
drwxrwxr-x 2 hnakamur hnakamur 8 May 12 17:05 0000000000000000
drwxrwxr-x 2 hnakamur hnakamur 3 May 12 17:05 0000000000000001

destination.db/generations/40e9bff6b361ab2f/wal/0000000000000000:
total 18
-rw-r--r-- 1 hnakamur hnakamur 464 May 12 17:04 0000000000000000.wal.lz4
-rw-r--r-- 1 hnakamur hnakamur  91 May 12 17:05 0000000000004080.wal.lz4
-rw-r--r-- 1 hnakamur hnakamur  97 May 12 17:05 0000000000005098.wal.lz4
-rw-r--r-- 1 hnakamur hnakamur 102 May 12 17:05 00000000000060b0.wal.lz4
-rw-r--r-- 1 hnakamur hnakamur 111 May 12 17:05 00000000000070c8.wal.lz4
-rw-r--r-- 1 hnakamur hnakamur 122 May 12 17:05 00000000000080e0.wal.lz4

destination.db/generations/40e9bff6b361ab2f/wal/0000000000000001:
total 5
-rw-r--r-- 1 hnakamur hnakamur 119 May 12 17:05 0000000000000000.wal.lz4