Azure Queue StorageをバックエンドにしたPHP製のジョブキューライブラリ「Backjob」を作った

AzureにはQueue Storageというマネージドサービスがあるのですが、そのQueueStorageをバックエンドとして利用したPHP製のジョブキューライブラリを作りました。

github.com

なぜ作ったのか?

今自分の会社で作っているプロダクトの特性がWrite Heavyなアプリケーションになっており、ユーザーがプッシュ通知などで大量アクセスするとWebApp側はスケールできてもRDB側がスケールできないという課題がありました。

それに加えて、2年程まえに自分達で自作していたジョブキューシステムがあったのですがそれがアプリケーションで使っているRDBをそのままバックエンドとして使用しており、その性能限界により「ジョブ登録が大量に来るとロック待ちが発生し障害の元となる」、「ジョブが詰まったときにワーカーを増やして処理性能をあげたいがRDBのパフォーマンスネックによリこれ以上ワーカーを増やせない」という問題もありました。

その課題を解決するために、RDBではなくAzure Queue Storageを用いたジョブキューライブラリを作り、ユーザーからのリクエストで直接RDBに書き込みをするのではなく新しいジョブキュー経由で書き込みをするように変更することでDB負荷の軽減と既存ジョブキューライブラリの課題を解決させたいと考えました。

ここでLaravelなどのモダンなフレームワークを使っていれば標準なジョブキューライブラリがあると思いますが、私達が利用するFuelPHPにはそのようなものがなく、また他のフレームワークで利用されるジョブキューライブラリをみてもRedisやAWSのSQSに対応していてもAzureのQueue Storageには対応していない状況だったため自分で作りました。

インストール

composer でインストールできるように公開しているので、以下の内容をプロジェクトのcomposer.jsonに追記してください。

{
    "require": {
        "arakaki-yuji/backjob": "^0.0.5"
    }
}

使い方

自分のJobを定義する

以下のように\Backjob\Jobクラスを継承して自分の独自Jobを定義します。

class CustomJob extends \Backjob\Job
{
    /**
     * You must define a run method.
     * this method is called when dequeued and run
     */
    public function run()
    {
        $msg = $this->params['message'];
        return $msg;
    }
    
    /**
     * this method is optional.
     * if you define success method, it is called after run method successed.
     */
    public function success()
    {
        return 'success job';
    }
    
     /**
     * this method is optional.
     * if you define fail method, it is called after run method failed.
     */
    public function fail()
    {
        return 'success job';
    }
}

runメソッドはJobが実行されるときに呼ばれるメソッドです。このrunメソッドの中でこのJobが実行したいタスクの実装を書いていきます。 Jobを登録するときにパラメータが渡せるようになっているのですが、そのパラメータは$this->paramsの中に連想配列で入っています。 このrunメソッドは必ず定義してください。

failメソッドとsuccessメソッドは実装してもしなくても構いません。

successメソッドはrunメソッドが正常に実行されたあとに呼ばれるメソッドです。

failメソッドはrunメソッド内で例外やエラーが発生したときに呼ばれるメソッドです。

それぞれ自分たちのアプリケーションの要件に合わせて利用してください。

Jobの登録、そして登録されたジョブの実行の仕方

キューへのジョブの登録、そしてキューからジョブを取り出して実行するために利用するBackjobクラスのインスタンスを作ります。

引数にはQueue Storageのストレージアカウント名、Queue Storageの名前、ストレージアカウントのアクセスキーを与えます。

$backjob = new \Backjob\Backjob::factory($storageAccountName, $queueName, $accessKey);

ジョブをキューストレージに登録するには、登録したいジョブのクラスメソッド makeJobメソッドに引数としてジョブに渡したいパラメータを連想配列で渡し、インスタンス化します。

その後、Backjobインスタンスインスタンスメソッド、queueの引数としてジョブを渡して上げればジョブキューへの登録が完了します。

$params = ['message' => 'Hello Backjob'];
$job = CustomeJob::makeJob($params);
$backjob->queue($job);

ジョブキューに登録されたジョブを実行するには、Backjobインスタンスのrunメソッドを実行します。 runメソッドを実行すると、ジョブキューからひとつのジョブを取り出し、そのジョブをrunメソッドを実行して処理を行います。

$backjob = new \Backjob\Backjob::factory($storageAccountName, $queueName, $accessKey);
$backjob->run(); // => 'Hello Backjob'

このrunメソッドをcronで叩く、もしくはdaemon化してループで呼び続けるコードをそれぞれ使っているフレームワークの流儀に沿って作っていただければOKです。

今後について

自分達のプロダクトに必要だったから作ったライブラリなので、今後も必要に応じてアップデートはしていく予定です。

いま考えている範囲だと、Azure Queue Storageの性能限界を超えた量のジョブを処理する必要になった場合にバックエンドに用意するQueue Storageを複数設定できるようにして負荷分散に対応する、またqueueへの登録が失敗したときのretry回数やtimeoutの設定できるようにするなどはやりたいと考えています。

とりあえず現時点で一週間で40万件以上のジョブを処理しているので、ちゃんと使えると思います。