Litestreamのコードリーディング

はじめに

対象バージョン https://github.com/benbjohnson/litestream/tree/e6f7c6052d84b7265fd54d3a3ab33208948e126b

replicate と restore のコードを読んで見る。 順を追って全部書くのは大変なので気になったところだけメモ。

次回: Litestreamのコードリーディングその2

replicate と restore のログ出力例

$ litestream replicate source.db file:///home/hnakamur/litestream-work/destination.db
litestream (development build)
initialized db: /home/hnakamur/litestream-work/source.db
replicating to: name="file" type="file" path="/home/hnakamur/litestream-work/destination.db"
litestream initialization complete
/home/hnakamur/litestream-work/source.db: init: no wal files available, clearing generation
/home/hnakamur/litestream-work/source.db: init: no wal files available, clearing generation
/home/hnakamur/litestream-work/source.db: sync: new generation "40e9bff6b361ab2f", no generation exists
/home/hnakamur/litestream-work/source.db(file): snapshot written 40e9bff6b361ab2f/0000000000000000
/home/hnakamur/litestream-work/source.db(file): wal segment written: 40e9bff6b361ab2f/00000000
00000000:0000000000000000 sz=16512
/home/hnakamur/litestream-work/source.db(file): wal segment written: 40e9bff6b361ab2f/0000000000000000:0000000000004080 sz=4120
/home/hnakamur/litestream-work/source.db(file): wal segment written: 40e9bff6b361ab2f/0000000000000000:0000000000005098 sz=4120
/home/hnakamur/litestream-work/source.db(file): wal segment written: 40e9bff6b361ab2f/0000000000000000:00000000000060b0 sz=4120
/home/hnakamur/litestream-work/source.db(file): wal segment written: 40e9bff6b361ab2f/0000000000000000:00000000000070c8 sz=4120
/home/hnakamur/litestream-work/source.db(file): wal segment written: 40e9bff6b361ab2f/0000000000000000:00000000000080e0 sz=4120
^Csignal received, litestream shutting down
/home/hnakamur/litestream-work/source.db: checkpoint(PASSIVE): [0,9,9]
/home/hnakamur/litestream-work/source.db(file): wal segment written: 40e9bff6b361ab2f/0000000000000001:0000000000000000 sz=4152
litestream shut down
$ litestream restore -o restored.db file:///home/hnakamur/litestream-work/destination.db
2022/05/12 18:10:26.394413 restoring snapshot 40e9bff6b361ab2f/0000000000000000 to restored.db.tmp
2022/05/12 18:10:26.404514 applied wal 40e9bff6b361ab2f/0000000000000000 elapsed=1.810673ms
2022/05/12 18:10:26.405770 applied wal 40e9bff6b361ab2f/0000000000000001 elapsed=1.237403ms
2022/05/12 18:10:26.405779 renaming database from temporary location

データベースのオープン周り

litestream-sqlite3 というカスタムのsqlドライバ

litestream.go 内の init() 関数 で litestream-sqlite3 という名前のドライバを sql.Register で登録している。接続時のフックで conn.SetFileControlInt("main", sqlite3.SQLITE_FCNTL_PERSIST_WAL, 1) を呼んで WAL を閉じた後も消さないようにしている。

詳細は SQLite の レファレンス SQLITE_FCNTL_PERSIST_WAL 参照。

データベースを開くのは 3 箇所

$ vgrep litestream-sqlite3
Index File          Line Content
    0 db.go          540 if db.db, err = sql.Open("litestream-sqlite3", dsn); err != nil {
    1 db.go          667 if db.db, err = sql.Open("litestream-sqlite3", dsn); err != nil {
    2 db.go         1880 d, err := sql.Open("litestream-sqlite3", dbPath)
    3 litestream.go   61 sql.Register("litestream-sqlite3", &sqlite3.SQLiteDriver{

DB.init メソッド

// init initializes the connection to the database. Skipped if already
// initialized or if the database file does not exist.

func (db *DB) init() (err error)

db.acquireReadLock メソッド

// acquireReadLock begins a read transaction on the database to prevent checkpointing.

func (db *DB) acquireReadLock() error

db.releaseReadLock メソッド

// releaseReadLock rolls back the long-running read transaction.

func (db *DB) releaseReadLock() error

スナップショットと WAL ファイル書き出し

db.Sync メソッド

// Sync copies pending data from the WAL to the shadow WAL.

func (db *DB) Sync(ctx context.Context) error

db.sync メソッド

func (db *DB) sync(ctx context.Context) (err error)

db.checkpoint メソッド

// checkpointAndInit performs a checkpoint on the WAL file and initializes a new shadow WAL file.

func (db *DB) checkpoint(ctx context.Context, generation, mode string) error

db.execCheckpoint メソッド

func (db *DB) execCheckpoint(mode string) (err error)

リストア

Restore 関数

// Restore restores the database to the given index on a generation.

func Restore(ctx context.Context, client ReplicaClient, filename, generation string, snapshotIndex, targetIndex int, opt RestoreOptions) (err error)

RestoreSnapshot 関数

// RestoreSnapshot copies a snapshot from the replica client to a file.

func RestoreSnapshot(ctx context.Context, client ReplicaClient, filename, generation string, index int, mode os.FileMode, uid, gid int) error

NewWALDownloader 関数

// NewWALDownloader returns a new instance of WALDownloader.

func NewWALDownloader(client ReplicaClient, prefix string, generation string, minIndex, maxIndex int) *WALDownloader

WALDownloader.Next メソッド

// Next returns the index & local file path of the next downloaded WAL file.

func (d *WALDownloader) Next(ctx context.Context) (int, string, error)

ApplyWAL 関数

// ApplyWAL performs a truncating checkpoint on the given database.

func ApplyWAL(ctx context.Context, dbPath, walPath string) error

db.close メソッド

// Close flushes outstanding WAL writes to replicas, releases the read lock, and closes the database.

func (db *DB) Close() (err error)