Goでetcdを使ったシンプルなジョブキューを実装してみた
背景:これまではPostgreSQLのSELECT FOR UPDATE SKIP LOCKEDを使っていました
ジョブキュー用のデーブルで状態のカラムが処理待ちの行をSELECT FOR UPDATE SKIP LOCKED
にLIMIT 1
をつけて1行取得し、その後UPDATE
文でそのレコードの状態のカラムを処理中に変更して、
ジョブの処理を開始します。
で、処理が終わったらジョブキュー用のテーブルから対象のレコードを削除するという感じです。
キューの状態がPostgreSQLのテーブルで管理されるので、なにかトラブルがあっても調査や対処がしやすいというのが私にとっての利点です。 また元々PostgreSQLは使っているので、メッセージキューなどのミドルウェアを新たに運用せずに済むというのも利点です。
欠点は、私はワーカーをGoで書いてsystemdのサービスにしていて2台の仮想マシンのサーバーでアクティブ・スタンバイ構成にしているのですが、 アクティブとスタンバイの切り替えが手動であるという点です。
仮想マシンのホストサーバーが計画メンテナンスになったため仮想マシンを停止・起動する際や、ハードウェアやネットワークの障害が起きたときに、 理想としては自動で切り替わってほしいところです。
これまではサービスの数が少なかったので手動でも良いかと思っていたのですが、数が増えて来たのでそろそろ自動化したほうが良いなと思い始めていました。
サービスの死活監視をしてアクティブ・スタンバイを切り替えるような仕組みを自作するという案も考えられますが、 etcdを活用して楽できないかと思い、考えてみました。
なお、私がetcdを使うのは今回が初めてです(チュートリアルで触ったことは以前にもありましたが)。
etcdを使った案を考えてみた
なぜetcdか
なぜetcdかというと、Kubernetesなどで利用されていて実績が十分というのと、ディスク上のキー・バリュー・ストアのetcd-io/bbolt: An embedded key/value database for Go.は以前コードを見て(bboltのコードリーディング · hnakamur’s blog、bboltのフリーリストのコードリーディング · hnakamur’s blog)構造の概要を把握しているので安心感があること、あとはetcdctlという付属のクライアントでキー・バリューの状態を確認や修正できるので、なにかトラブルが起きても調査や対処がしやすそうと思ったからです。また、etcdの運用も手間がかからなさそうというのもあります。
今回使ったetcdの仕組み
キー・バリューペアの設定、取得、削除以外で、今回使ったetcdの仕組みとしては以下の3つです。
- トランザクションを使って、キーが存在しない場合にのみ作成する。
- キーを作成する際にleaseという仕組みを使ってTTLを設定、自動更新し、lease解放時とワーカーの異常終了時に自動的にキーを削除する。
- キーまたはキーの範囲をウォッチする。
1つめの2つめにより、排他制御のためのロックを実現し、かつ異常終了時にロックを自動的に解放するということができます。
1つめは、すべてのワーカーがトランザクションを使ってロックのキーが存在しない場合にのみ作成する必要があります。単にキーを設定しようとするとすると上書きできてしまうので、参加者すべてが協力的である必要があります。が、私の用途ではこれは大丈夫です。
Goのクライアントライブラリのgo.etcd.io/etcd/client/v3ではトランザクションはTxnというインターフェースになっています。
キーが存在しない場合という条件はキーのリビジョンが0であるIf(clientv3.Compare(clientv3.Version(key), "=", 0))
と書けば良いそうです(ChatGPTに聞いて知りました)。
2つめのleaseはetcdctlでの使い方がHow to create lease | etcdにあります。
go.etcd.io/etcd/client/v3ではLeaseというインタフェースとなっています。
このKeepAlive
メソッドを呼ぶと裏側で自動で定期的にleaseを更新してくれます。
その後正常にジョブが完了したらRevoke
メソッドを呼ぶとleaseが取り消されて対象のキーも消えます。
さらにありがたいのは、Revoke
を呼ばずにClientのCloseメソッドを呼んだり、さらにそれすら呼ばずにプロセスが異常終了した場合でも、leaseの自動更新が止まっていずれ期限切れになり、キーが自動で削除される点です。
実はgo.etcd.io/etcd/client/v3/concurrencyパッケージにMutexというのがあるのを見かけたのですが、コードを見てみると上記で説明した実装のほうがシンプルそうなので自前実装することにしました。ただし、私が気付いていない問題があるのかもしれないので、気付いた方はぜひ教えてください。かなりシンプルな仕組みなのでたぶん大丈夫だとは思うのですが。
ジョブの投入
ジョブのIDは自動でUUIDv7で生成し、job-queue/
のような接頭辞の後にジョブIDを連結したキーに利用者が指定したジョブの内容の文字列を値として設定します。この値にはワーカーがジョブを実行するために必要な情報を含めます。ジョブの実行と結果の反映に関しては、このジョブキューは関知せず、ワーカーの責任としています。
UUIDv7を使うことで作成した日時順にキーが並びます。万が一キーが衝突することを考えて、このキーの作成時もトランザクションを使って、キーが存在しない場合にのみ作成するようにします。衝突した場合は別のUUIDv7の値を生成してリトライします。
横道:ジョブIDをUUIDではなく連番にしたい場合の実装案
UUIDではなく連番にしたい場合は、連番発番用のキーを用意してトランザクションでキーが存在しない場合は初期値で作成し、存在する場合は次の番号で更新するという実装にすれば良さそうです。ジョブIDは文字列順に並ぶ必要があるので、10進数か16進数で左ゼロパディングして最大桁数にした文字列にするのが良さそうです。BigEndianのバイナリ形式でも良いでしょうが、その場合はetcdctlでの確認がしずらいので、運用時の確認用のツールを別途実装するのが良いと思います。今回は連番にするこだわりは無かったので、手軽にUUIDv7で良しとしました。
ジョブの取得と実行
実は最初にdistributed - How would you implement a working queue in etcd - Stack Overflowを見ていました。こちらは実行待ちのキュー(pending queue)と実行中のキュー(running queue)を分けるという案なのですが、実行中にワーカーが異常終了すると実行中のキューにジョブが残るので、ワーカーがジョブを取得するときは、実行中のキューも見る必要があるという話でした。あるいは異常終了したら実行中のキューから実行待ちのキューにジョブを戻すと良いのでしょうが、だれがその処理をするのかという問題があります。
ということで今回の実装ではキューは1つにしました。その代わり、実行中の状態を示すために別のキーを使います。job-worker/
のような別の接頭辞にジョブのUUIDの値を連結したキーとし、値はワーカーIDにします。機能的には実行中ということさえ分かればよいので値は何でも良いのですが、ワーカーIDにしておくと運用中に確認するときに便利そうだと思ったのでそうしました。
このキーも存在しない場合にのみ作成するようにします。キーが既に存在する場合は、他のワーカーが実行中ということなのでjob-queue/
配下の次のキーを同様に処理していきます。
このキーの値はleaseつきで作成してKeepAlive
でleaseを自動更新にしておきます。これでジョブごとにワーカーがロックを作ることになります。
job-queue/
配下にキーがないか全てのキーが実行中の場合は、job-queue/
配下とjob-worker/
配下をウォッチして変動があるまで待ち、変動があれば最初から探索して上記の処理を行います。
ワーカーがジョブの処理を完了したら、job-queue/
配下のジョブのキーを削除しleaseを解放してロックのキーも削除します。job-queue/
配下のジョブのキーの削除だけ終わった後に異常終了した場合は、leaseの更新が途絶えるのでいずれロックのキーも消えます。
この実装の問題点
今回の方式ではジョブが2回以上実行されることがあり得る
ジョブの実行が終わったらジョブのキーを削除しleaseを取り消してロックも解放するという前提ですが、ジョブの実行が終わった後ジョブのキーを消すまでの間に障害が発生してロックのleaseが更新できずジョブのキーが残ったままロックが強制的に解放されてしまうと、別のワーカー(あるいは障害から復旧した元のワーカー)が同じジョブを再度実行するということが起こりえます。
ジョブの開始後終了前でも同様で、途中まで実行したジョブを別のワーカーが最初から再度実行することになります。
ジョブを開始したらワーカーの責任で完遂させて、終わったら速やかにleaseを取り消すのが大切です。ジョブの結果を反映するのはジョブの最後にまとめて行うようにして、その前の処理は再実行しても問題ないように実装するのが良さそうです。ただし、その場合でも反映後ジョブのキーを消す前に異常終了してロックが強制解放されると、やはり別のワーカーによりジョブが最初から再実行されてしまいます。
これが許されないケースではこの実装は使わないでください。その場合はleaseを設定せずにロックを作成するような実装にするのが良さそうです。ただし、そうすると今度はワーカー異常終了時にロックが残るので、そちらへの対応を手動で行うかあるいは自動で対応できるような方式を考える必要があります。今回はこちらは考えないことにします。
また、ジョブの結果をPostgreSQLのデータベースに反映する場合は、冒頭のSELECT FOR UPDATE SKIP LOCKED
を使ったジョブキューのほうが、1つのトランザクションで結果の反映とジョブの削除を行えるという利点があります。
この実装の場合はジョブの結果の反映はジョブの削除とまとめてアトミックにはできません(ジョブの結果の反映先がetcdの場合は同じトランザクション内でできるように実装することは可能でしょうが、今回の実装では想定外です)。一方、ジョブの内容がメール送信や外部API呼び出しなどの場合は、どのみち完了したジョブの削除とまとめてアトミックにはできないので、今回の実装でも十分だと思います。
ジョブの結果をPostgreSQLのデータベースに反映する場合でも、PostgreSQL側のコミット後、完了したジョブをetcdで削除するまでの間に異常終了してleaseが期限切れになりロックが強制解放されたときに、再実行されるのが許容できるのであれば、今回の実装でも使って良いということになります。
ジョブごとに実行日時を指定できるキューも考えてみた
こちらはTimedQueue
という名前にし(英語として自然かは不明)、ジョブIDを実行日時とUUIDv7を_
で連結した文字列としました。
実行日時は文字列として順に並ぶような形式にする必要があるため、Time.Formatで20060102T150405
や20060102T150405.000
のような形式にしました。TimedQueue
のインスタンス作成時に日時の精度を秒、ミリ秒、マイクロ秒、ナノ秒から選ぶようにしました(当然ですが同じキューを使う全てのクライアントでそろえておく必要があります)。
なお、UUIDv7はジョブの登録時に生成するので、ジョブの実行日時とは無関係です。
ジョブの取得時はjob-queue/
配下のキーから実行日時部分を取り出して、現在日時以前なら実行中のロックを作って実行します。
最初のキーの実行日時が現在日時より後の場合は、job-queue/
配下とjob-worker/
配下をウォッチしつつその時間まで待ちます。その時間になるか、その時間の前にjob-queue/
配下かjob-worker/
配下に変動があれば、取得の処理を最初からやり直します。
繰り返し実行の仕組みはありませんが、ジョブの値に必要な情報を含めておいて、ワーカーがジョブの実行時に次回のジョブを投入するようにすれば、代替としては良さそうな気がします。
実運用に耐えるかは不明
まだとりあえずコード書いて、ちょっと動かしてみたレベルです。テストもまだ少ししか書いてなくて、これから増やしていきたいと思っているところです。 というわけでタイトルは正確には「実装してみた」ではなく「実装してみている途中」でした。