コンテンツへスキップ

キュー

イントロダクション

Webアプリケーションの構築中、アップロードされたCSVファイルの解析や保存など、通常のWebリクエスト中に実行するには時間がかかりすぎるタスクが発生することがあります。ありがたいことに、Laravelではバックグラウンドで処理できるキュー投入されたジョブを簡単に作成できます。時間のかかるタスクをキューに移動することで、アプリケーションはWebリクエストに高速で応答し、顧客により良いユーザーエクスペリエンスを提供できます。

Laravelキューは、Amazon SQSRedis、さらにはリレーショナルデータベースなど、さまざまなキューバックエンドにわたる統一されたキューイングAPIを提供します。

Laravelのキュー設定オプションは、アプリケーションのconfig/queue.php設定ファイルに保存されています。このファイルには、データベース、Amazon SQSRedisBeanstalkdドライバなど、フレームワークに含まれる各キュードライバの接続設定があります。また、ジョブを即時に実行する同期ドライバ(ローカル開発中に使用)も含まれています。キュー投入されたジョブを破棄するnullキュードライバも用意しています。

Laravelは現在、Redisを利用したキューのための美しいダッシュボードと設定システムであるHorizonを提供しています。詳細はHorizonの完全なドキュメントをご覧ください。

接続とキュー

Laravelキューを使い始める前に、「接続」と「キュー」の違いを理解することが重要です。config/queue.php設定ファイルには、connections設定配列があります。このオプションは、Amazon SQS、Beanstalk、Redisなどのバックエンドキューサービスへの接続を定義します。しかし、どの特定のキュー接続も複数の「キュー」を持つことができ、これらはキュー投入されたジョブの異なるスタックや山と考えることができます。

queue設定ファイル内の各接続設定例には、queue属性が含まれていることに注意してください。これは、ジョブが特定の接続に送信されるときにディスパッチされるデフォルトのキューです。言い換えれば、どのキューにディスパッチすべきかを明示的に定義せずにジョブをディスパッチした場合、そのジョブは接続設定のqueue属性で定義されたキューに配置されます。

1use App\Jobs\ProcessPodcast;
2 
3// This job is sent to the default connection's default queue...
4ProcessPodcast::dispatch();
5 
6// This job is sent to the default connection's "emails" queue...
7ProcessPodcast::dispatch()->onQueue('emails');

一部のアプリケーションでは、ジョブを複数のキューにプッシュする必要はなく、代わりに1つのシンプルなキューを持つことを好む場合があります。しかし、複数のキューにジョブをプッシュすることは、ジョブの処理方法に優先順位を付けたり、セグメント化したりしたいアプリケーションにとって特に便利です。Laravelのキューワーカは、優先順位に従って処理するキューを指定できるからです。たとえば、ジョブをhighキューにプッシュする場合、より高い処理優先順位を与えるワーカを実行できます。

1php artisan queue:work --queue=high,default

ドライバの注意点と前提条件

データベース

databaseキュードライバを使用するには、ジョブを保持するためのデータベーステーブルが必要です。通常、これはLaravelのデフォルトの0001_01_01_000002_create_jobs_table.php データベースマイグレーションに含まれています。しかし、アプリケーションにこのマイグレーションが含まれていない場合は、make:queue-table Artisanコマンドを使用して作成できます。

1php artisan make:queue-table
2 
3php artisan migrate

Redis

redisキュードライバを使用するには、config/database.php設定ファイルでRedisデータベース接続を設定する必要があります。

serializerおよびcompression Redisオプションは、redisキュードライバではサポートされていません。

Redisクラスタ

Redisキュー接続がRedisクラスタを使用している場合、キュー名にはキーハッシュタグを含める必要があります。これは、特定のキューのすべてのRedisキーが同じハッシュスロットに配置されるようにするために必要です。

1'redis' => [
2 'driver' => 'redis',
3 'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
4 'queue' => env('REDIS_QUEUE', '{default}'),
5 'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
6 'block_for' => null,
7 'after_commit' => false,
8],

ブロッキング

Redisキューを使用する場合、block_for設定オプションを使用して、ドライバがワーカループを反復処理してRedisデータベースを再ポーリングする前に、ジョブが利用可能になるまでどれくらい待機するかを指定できます。

キューの負荷に応じてこの値を調整することは、新しいジョブを求めてRedisデータベースを継続的にポーリングするよりも効率的です。たとえば、値を5に設定して、ドライバがジョブが利用可能になるまで5秒間ブロックすることを示すことができます。

1'redis' => [
2 'driver' => 'redis',
3 'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
4 'queue' => env('REDIS_QUEUE', 'default'),
5 'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
6 'block_for' => 5,
7 'after_commit' => false,
8],

block_for0に設定すると、ジョブが利用可能になるまでキューワーカは無期限にブロックします。これにより、次のジョブが処理されるまでSIGTERMのようなシグナルが処理されなくなります。

その他のドライバの前提条件

リストされているキュードライバには、以下の依存関係が必要です。これらの依存関係は、Composerパッケージマネージャを介してインストールできます。

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~5.0
  • Redis: predis/predis ~2.0 または phpredis PHP拡張
  • MongoDB: mongodb/laravel-mongodb

ジョブの作成

ジョブクラスの生成

デフォルトでは、アプリケーションのキュー投入可能なジョブはすべてapp/Jobsディレクトリに保存されます。app/Jobsディレクトリが存在しない場合は、make:job Artisanコマンドを実行したときに作成されます。

1php artisan make:job ProcessPodcast

生成されたクラスはIlluminate\Contracts\Queue\ShouldQueueインターフェイスを実装し、Laravelに対してジョブを非同期で実行するためにキューにプッシュすべきであることを示します。

ジョブスタブは、スタブの公開を使用してカスタマイズできます。

クラスの構造

ジョブクラスは非常にシンプルで、通常、キューによってジョブが処理されるときに呼び出されるhandleメソッドのみを含んでいます。まず、ジョブクラスの例を見てみましょう。この例では、ポッドキャスト公開サービスを管理しており、公開前にアップロードされたポッドキャストファイルを処理する必要があると仮定します。

1<?php
2 
3namespace App\Jobs;
4 
5use App\Models\Podcast;
6use App\Services\AudioProcessor;
7use Illuminate\Contracts\Queue\ShouldQueue;
8use Illuminate\Foundation\Queue\Queueable;
9 
10class ProcessPodcast implements ShouldQueue
11{
12 use Queueable;
13 
14 /**
15 * Create a new job instance.
16 */
17 public function __construct(
18 public Podcast $podcast,
19 ) {}
20 
21 /**
22 * Execute the job.
23 */
24 public function handle(AudioProcessor $processor): void
25 {
26 // Process uploaded podcast...
27 }
28}

この例では、Eloquentモデルをキュー投入されたジョブのコンストラクタに直接渡すことができたことに注意してください。ジョブが使用しているQueueableトレイトのおかげで、Eloquentモデルとロード済みのリレーションシップは、ジョブの処理中に適切にシリアライズおよびデシリアライズされます。

キュー投入されたジョブがコンストラクタでEloquentモデルを受け入れる場合、モデルの識別子のみがキューにシリアライズされます。ジョブが実際に処理されるとき、キューシステムはデータベースから完全なモデルインスタンスとロード済みのリレーションシップを自動的に再取得します。このモデルのシリアライズ方法により、キュードライバに送信されるジョブのペイロードをはるかに小さくできます。

handleメソッドの依存性注入

handleメソッドは、ジョブがキューによって処理されるときに呼び出されます。ジョブのhandleメソッドで依存関係をタイプヒントできることに注意してください。Laravelのサービスコンテナは、これらの依存関係を自動的に注入します。

コンテナがhandleメソッドに依存関係をどのように注入するかを完全に制御したい場合は、コンテナのbindMethodメソッドを使用できます。bindMethodメソッドは、ジョブとコンテナを受け取るコールバックを受け入れます。コールバック内では、handleメソッドを好きなように呼び出すことができます。通常、このメソッドはApp\Providers\AppServiceProvider サービスプロバイダbootメソッドから呼び出すべきです。

1use App\Jobs\ProcessPodcast;
2use App\Services\AudioProcessor;
3use Illuminate\Contracts\Foundation\Application;
4 
5$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
6 return $job->handle($app->make(AudioProcessor::class));
7});

生の画像コンテンツなどのバイナリデータは、キュー投入されたジョブに渡す前にbase64_encode関数を介して渡す必要があります。そうしないと、ジョブがキューに配置されるときにJSONに正しくシリアライズされない可能性があります。

キュー投入されたリレーションシップ

ジョブがキューに投入されるとき、ロード済みのEloquentモデルリレーションシップもすべてシリアライズされるため、シリアライズされたジョブ文字列は非常に大きくなることがあります。さらに、ジョブがデシリアライズされてモデルリレーションシップがデータベースから再取得されるとき、それらは全体として取得されます。ジョブのキューイングプロセス中にモデルがシリアライズされる前に適用された以前のリレーションシップ制約は、ジョブがデシリアライズされるときには適用されません。したがって、特定のリレーションシップのサブセットを操作したい場合は、キュー投入されたジョブ内でそのリレーションシップを再制約する必要があります。

または、リレーションがシリアライズされないようにするために、プロパティ値を設定するときにモデルでwithoutRelationsメソッドを呼び出すことができます。このメソッドは、ロードされたリレーションシップなしでモデルのインスタンスを返します。

1/**
2 * Create a new job instance.
3 */
4public function __construct(
5 Podcast $podcast,
6) {
7 $this->podcast = $podcast->withoutRelations();
8}

PHPのコンストラクタプロパティプロモーションを使用していて、Eloquentモデルのリレーションをシリアライズすべきではないことを示したい場合は、WithoutRelations属性を使用できます。

1use Illuminate\Queue\Attributes\WithoutRelations;
2 
3/**
4 * Create a new job instance.
5 */
6public function __construct(
7 #[WithoutRelations]
8 public Podcast $podcast,
9) {}

ジョブが単一のモデルではなく、Eloquentモデルのコレクションまたは配列を受け取る場合、そのコレクション内のモデルは、ジョブがデシリアライズされて実行されるときにリレーションシップが復元されません。これは、多数のモデルを扱うジョブでの過剰なリソース使用を防ぐためです。

ユニークジョブ

ユニークジョブには、ロックをサポートするキャッシュドライバが必要です。現在、memcachedredisdynamodbdatabasefile、およびarrayキャッシュドライバがアトミックロックをサポートしています。さらに、ユニークジョブの制約はバッチ内のジョブには適用されません。

特定のジョブのインスタンスがキュー上に常に1つだけであることを保証したい場合があります。これは、ジョブクラスでShouldBeUniqueインターフェイスを実装することで実現できます。このインターフェイスは、クラスに追加のメソッドを定義する必要はありません。

1<?php
2 
3use Illuminate\Contracts\Queue\ShouldQueue;
4use Illuminate\Contracts\Queue\ShouldBeUnique;
5 
6class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
7{
8 // ...
9}

上記の例では、UpdateSearchIndexジョブはユニークです。したがって、同じジョブの別のインスタンスが既にキューにあり、処理が完了していない場合、そのジョブはディスパッチされません。

特定の場合では、ジョブをユニークにする特定の「キー」を定義したり、ジョブがユニークでなくなるまでのタイムアウトを指定したりしたい場合があります。これを実現するために、ジョブクラスにuniqueIdおよびuniqueForプロパティまたはメソッドを定義できます。

1<?php
2 
3use App\Models\Product;
4use Illuminate\Contracts\Queue\ShouldQueue;
5use Illuminate\Contracts\Queue\ShouldBeUnique;
6 
7class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
8{
9 /**
10 * The product instance.
11 *
12 * @var \App\Product
13 */
14 public $product;
15 
16 /**
17 * The number of seconds after which the job's unique lock will be released.
18 *
19 * @var int
20 */
21 public $uniqueFor = 3600;
22 
23 /**
24 * Get the unique ID for the job.
25 */
26 public function uniqueId(): string
27 {
28 return $this->product->id;
29 }
30}

上記の例では、UpdateSearchIndexジョブは製品IDによってユニークです。したがって、同じ製品IDを持つジョブの新しいディスパッチは、既存のジョブが処理を完了するまで無視されます。さらに、既存のジョブが1時間以内に処理されない場合、ユニークロックが解放され、同じユニークキーを持つ別のジョブをキューにディスパッチできます。

アプリケーションが複数のWebサーバまたはコンテナからジョブをディスパッチする場合、Laravelがジョブがユニークであるかを正確に判断できるように、すべてのサーバが同じ中央キャッシュサーバと通信していることを確認する必要があります。

処理が開始されるまでジョブをユニークに保つ

デフォルトでは、ユニークジョブはジョブが処理を完了するか、すべての再試行に失敗した後に「ロック解除」されます。しかし、ジョブが処理される直前にロックを解除したい状況があるかもしれません。これを実現するために、ジョブはShouldBeUniqueコントラクトの代わりにShouldBeUniqueUntilProcessingコントラクトを実装する必要があります。

1<?php
2 
3use App\Models\Product;
4use Illuminate\Contracts\Queue\ShouldQueue;
5use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
6 
7class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
8{
9 // ...
10}

ユニークジョブのロック

内部では、ShouldBeUniqueジョブがディスパッチされると、LaravelはuniqueIdキーでロックを取得しようとします。ロックが取得できない場合、ジョブはディスパッチされません。このロックは、ジョブが処理を完了するか、すべての再試行に失敗したときに解放されます。デフォルトでは、Laravelはこのロックを取得するためにデフォルトのキャッシュドライバを使用します。しかし、ロックを取得するために別のドライバを使用したい場合は、使用すべきキャッシュドライバを返すuniqueViaメソッドを定義できます。

1use Illuminate\Contracts\Cache\Repository;
2use Illuminate\Support\Facades\Cache;
3 
4class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
5{
6 // ...
7 
8 /**
9 * Get the cache driver for the unique job lock.
10 */
11 public function uniqueVia(): Repository
12 {
13 return Cache::driver('redis');
14 }
15}

ジョブの同時処理を制限する必要があるだけの場合は、代わりにWithoutOverlappingジョブミドルウェアを使用してください。

暗号化ジョブ

Laravelでは、暗号化によってジョブデータのプライバシーと完全性を保証できます。始めるには、ジョブクラスにShouldBeEncryptedインターフェイスを追加するだけです。このインターフェイスがクラスに追加されると、Laravelはジョブをキューにプッシュする前に自動的に暗号化します。

1<?php
2 
3use Illuminate\Contracts\Queue\ShouldBeEncrypted;
4use Illuminate\Contracts\Queue\ShouldQueue;
5 
6class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
7{
8 // ...
9}

ジョブミドルウェア

ジョブミドルウェアを使用すると、キュー投入されたジョブの実行をカスタムロジックでラップし、ジョブ自体の定型コードを減らすことができます。たとえば、LaravelのRedisレート制限機能を利用して、5秒ごとに1つのジョブのみを処理できるようにする次のhandleメソッドを考えてみましょう。

1use Illuminate\Support\Facades\Redis;
2 
3/**
4 * Execute the job.
5 */
6public function handle(): void
7{
8 Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
9 info('Lock obtained...');
10 
11 // Handle job...
12 }, function () {
13 // Could not obtain lock...
14 
15 return $this->release(5);
16 });
17}

このコードは有効ですが、Redisのレート制限ロジックで煩雑になっているため、handleメソッドの実装がうるさくなります。さらに、このレート制限ロジックは、レート制限したい他のジョブでも複製する必要があります。

handleメソッドでレート制限する代わりに、レート制限を処理するジョブミドルウェアを定義できます。Laravelにはジョブミドルウェアのデフォルトの場所がないため、アプリケーションのどこにでもジョブミドルウェアを配置できます。この例では、ミドルウェアをapp/Jobs/Middlewareディレクトリに配置します。

1<?php
2 
3namespace App\Jobs\Middleware;
4 
5use Closure;
6use Illuminate\Support\Facades\Redis;
7 
8class RateLimited
9{
10 /**
11 * Process the queued job.
12 *
13 * @param \Closure(object): void $next
14 */
15 public function handle(object $job, Closure $next): void
16 {
17 Redis::throttle('key')
18 ->block(0)->allow(1)->every(5)
19 ->then(function () use ($job, $next) {
20 // Lock obtained...
21 
22 $next($job);
23 }, function () use ($job) {
24 // Could not obtain lock...
25 
26 $job->release(5);
27 });
28 }
29}

ルートミドルウェアのように、ジョブミドルウェアは処理中のジョブと、ジョブの処理を続行するために呼び出すべきコールバックを受け取ることがわかります。

ジョブミドルウェアを作成した後、ジョブのmiddlewareメソッドからそれらを返すことでジョブにアタッチできます。このメソッドはmake:job Artisanコマンドでスキャフォールドされたジョブには存在しないため、ジョブクラスに手動で追加する必要があります。

1use App\Jobs\Middleware\RateLimited;
2 
3/**
4 * Get the middleware the job should pass through.
5 *
6 * @return array<int, object>
7 */
8public function middleware(): array
9{
10 return [new RateLimited];
11}

ジョブミドルウェアは、キュー投入可能なイベントリスナ、メイラブル、通知にも割り当てることができます。

レート制限

独自のレート制限ジョブミドルウェアを作成する方法を実演しましたが、Laravelにはジョブをレート制限するために利用できるレート制限ミドルウェアが実際に含まれています。ルートレートリミッタと同様に、ジョブのレートリミッタはRateLimiterファサードのforメソッドを使用して定義されます。

たとえば、ユーザーが1時間に1回データをバックアップできるようにし、プレミアム顧客にはそのような制限を課さないようにしたい場合があります。これを実現するために、AppServiceProviderbootメソッドでRateLimiterを定義できます。

1use Illuminate\Cache\RateLimiting\Limit;
2use Illuminate\Support\Facades\RateLimiter;
3 
4/**
5 * Bootstrap any application services.
6 */
7public function boot(): void
8{
9 RateLimiter::for('backups', function (object $job) {
10 return $job->user->vipCustomer()
11 ? Limit::none()
12 : Limit::perHour(1)->by($job->user->id);
13 });
14}

上記の例では、時間単位のレート制限を定義しましたが、perMinuteメソッドを使用して分単位のレート制限を簡単に定義できます。さらに、レート制限のbyメソッドに好きな値を渡すことができますが、この値は通常、顧客ごとにレート制限をセグメント化するために使用されます。

1return Limit::perMinute(50)->by($job->user->id);

レート制限を定義したら、Illuminate\Queue\Middleware\RateLimitedミドルウェアを使用してレートリミッタをジョブにアタッチできます。ジョブがレート制限を超えるたびに、このミドルウェアはレート制限期間に基づいて適切な遅延を伴ってジョブをキューに戻します。

1use Illuminate\Queue\Middleware\RateLimited;
2 
3/**
4 * Get the middleware the job should pass through.
5 *
6 * @return array<int, object>
7 */
8public function middleware(): array
9{
10 return [new RateLimited('backups')];
11}

レート制限されたジョブをキューに戻すと、ジョブの総attempts数が増加します。ジョブクラスのtriesプロパティとmaxExceptionsプロパティを適宜調整することをお勧めします。または、retryUntilメソッドを使用して、ジョブが試行されなくなるまでの時間を定義することもできます。

レート制限されたときにジョブを再試行したくない場合は、dontReleaseメソッドを使用できます。

1/**
2 * Get the middleware the job should pass through.
3 *
4 * @return array<int, object>
5 */
6public function middleware(): array
7{
8 return [(new RateLimited('backups'))->dontRelease()];
9}

Redisを使用している場合は、Redisに最適化され、基本的なレート制限ミドルウェアよりも効率的なIlluminate\Queue\Middleware\RateLimitedWithRedisミドルウェアを使用できます。

ジョブの重複防止

Laravelには、任意のキーに基づいてジョブの重複を防ぐことができるIlluminate\Queue\Middleware\WithoutOverlappingミドルウェアが含まれています。これは、キュー投入されたジョブが一度に1つのジョブによってのみ変更されるべきリソースを変更している場合に役立ちます。

たとえば、ユーザーのクレジットスコアを更新するキュー投入されたジョブがあり、同じユーザーIDに対するクレジットスコア更新ジョブの重複を防ぎたいとします。これを実現するために、ジョブのmiddlewareメソッドからWithoutOverlappingミドルウェアを返すことができます。

1use Illuminate\Queue\Middleware\WithoutOverlapping;
2 
3/**
4 * Get the middleware the job should pass through.
5 *
6 * @return array<int, object>
7 */
8public function middleware(): array
9{
10 return [new WithoutOverlapping($this->user->id)];
11}

同じタイプの重複するジョブはキューに戻されます。解放されたジョブが再度試行されるまでに経過する必要がある秒数を指定することもできます。

1/**
2 * Get the middleware the job should pass through.
3 *
4 * @return array<int, object>
5 */
6public function middleware(): array
7{
8 return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
9}

重複するジョブを再試行しないようにすぐに削除したい場合は、dontReleaseメソッドを使用できます。

1/**
2 * Get the middleware the job should pass through.
3 *
4 * @return array<int, object>
5 */
6public function middleware(): array
7{
8 return [(new WithoutOverlapping($this->order->id))->dontRelease()];
9}

WithoutOverlappingミドルウェアは、Laravelのアトミックロック機能によって動作します。ジョブが予期せず失敗したり、タイムアウトしたりしてロックが解放されないことがあります。したがって、expireAfterメソッドを使用してロックの有効期限を明示的に定義できます。たとえば、以下の例では、ジョブの処理開始から3分後にWithoutOverlappingロックを解放するようにLaravelに指示します。

1/**
2 * Get the middleware the job should pass through.
3 *
4 * @return array<int, object>
5 */
6public function middleware(): array
7{
8 return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
9}

WithoutOverlappingミドルウェアには、ロックをサポートするキャッシュドライバが必要です。現在、memcachedredisdynamodbdatabasefile、およびarrayキャッシュドライバがアトミックロックをサポートしています。

ジョブクラス間でのロックキーの共有

デフォルトでは、WithoutOverlappingミドルウェアは同じクラスの重複ジョブのみを防ぎます。したがって、2つの異なるジョブクラスが同じロックキーを使用しても、重複は防がれません。しかし、sharedメソッドを使用して、ジョブクラス間でキーを適用するようにLaravelに指示できます。

1use Illuminate\Queue\Middleware\WithoutOverlapping;
2 
3class ProviderIsDown
4{
5 // ...
6 
7 public function middleware(): array
8 {
9 return [
10 (new WithoutOverlapping("status:{$this->provider}"))->shared(),
11 ];
12 }
13}
14 
15class ProviderIsUp
16{
17 // ...
18 
19 public function middleware(): array
20 {
21 return [
22 (new WithoutOverlapping("status:{$this->provider}"))->shared(),
23 ];
24 }
25}

例外のスロットリング

Laravelには、例外をスロットリングできるIlluminate\Queue\Middleware\ThrottlesExceptionsミドルウェアが含まれています。ジョブが特定の回数の例外をスローすると、ジョブを実行するさらなる試行は、指定された時間間隔が経過するまで遅延されます。このミドルウェアは、不安定なサードパーティサービスと対話するジョブに特に役立ちます。

たとえば、例外をスローし始めたサードパーティAPIと対話するキュー投入ジョブを想像してみましょう。例外をスロットリングするには、ジョブのmiddlewareメソッドからThrottlesExceptionsミドルウェアを返すことができます。通常、このミドルウェアは時間ベースの試行を実装するジョブと組み合わせるべきです。

1use DateTime;
2use Illuminate\Queue\Middleware\ThrottlesExceptions;
3 
4/**
5 * Get the middleware the job should pass through.
6 *
7 * @return array<int, object>
8 */
9public function middleware(): array
10{
11 return [new ThrottlesExceptions(10, 5 * 60)];
12}
13 
14/**
15 * Determine the time at which the job should timeout.
16 */
17public function retryUntil(): DateTime
18{
19 return now()->addMinutes(30);
20}

ミドルウェアが受け入れる最初のコンストラクタ引数は、スロットリングされる前にジョブがスローできる例外の数であり、2番目のコンストラクタ引数は、スロットリングされた後にジョブが再度試行されるまでに経過すべき秒数です。上記のコード例では、ジョブが10回連続で例外をスローした場合、30分の時間制限内で、ジョブを再度試行する前に5分間待ちます。

ジョブが例外をスローしたが、例外のしきい値にまだ達していない場合、ジョブは通常すぐに再試行されます。ただし、ミドルウェアをジョブにアタッチするときにbackoffメソッドを呼び出すことで、そのようなジョブを遅延させる分数を指定できます。

1use Illuminate\Queue\Middleware\ThrottlesExceptions;
2 
3/**
4 * Get the middleware the job should pass through.
5 *
6 * @return array<int, object>
7 */
8public function middleware(): array
9{
10 return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];
11}

内部的に、このミドルウェアはLaravelのキャッシュシステムを使用してレート制限を実装し、ジョブのクラス名がキャッシュの「キー」として利用されます。ミドルウェアをジョブにアタッチするときにbyメソッドを呼び出すことで、このキーをオーバーライドできます。これは、複数のジョブが同じサードパーティサービスと対話していて、共通のスロットリング「バケット」を共有させたい場合に便利です。

1use Illuminate\Queue\Middleware\ThrottlesExceptions;
2 
3/**
4 * Get the middleware the job should pass through.
5 *
6 * @return array<int, object>
7 */
8public function middleware(): array
9{
10 return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];
11}

デフォルトでは、このミドルウェアはすべての例外をスロットリングします。この動作は、ミドルウェアをジョブにアタッチするときにwhenメソッドを呼び出すことで変更できます。その場合、例外はwhenメソッドに提供されたクロージャがtrueを返した場合にのみスロットリングされます。

1use Illuminate\Http\Client\HttpClientException;
2use Illuminate\Queue\Middleware\ThrottlesExceptions;
3 
4/**
5 * Get the middleware the job should pass through.
6 *
7 * @return array<int, object>
8 */
9public function middleware(): array
10{
11 return [(new ThrottlesExceptions(10, 10 * 60))->when(
12 fn (Throwable $throwable) => $throwable instanceof HttpClientException
13 )];
14}

スロットリングされた例外をアプリケーションの例外ハンドラに報告させたい場合は、ミドルウェアをジョブにアタッチするときにreportメソッドを呼び出すことで行うことができます。オプションで、reportメソッドにクロージャを提供することができ、その場合、例外は指定されたクロージャがtrueを返す場合にのみ報告されます。

1use Illuminate\Http\Client\HttpClientException;
2use Illuminate\Queue\Middleware\ThrottlesExceptions;
3 
4/**
5 * Get the middleware the job should pass through.
6 *
7 * @return array<int, object>
8 */
9public function middleware(): array
10{
11 return [(new ThrottlesExceptions(10, 10 * 60))->report(
12 fn (Throwable $throwable) => $throwable instanceof HttpClientException
13 )];
14}

Redisを使用している場合は、Redisに最適化され、基本的な例外スロットリングミドルウェアよりも効率的なIlluminate\Queue\Middleware\ThrottlesExceptionsWithRedisミドルウェアを使用できます。

ジョブのスキップ

Skipミドルウェアを使用すると、ジョブのロジックを変更することなく、ジョブをスキップ/削除するように指定できます。Skip::whenメソッドは、指定された条件がtrueと評価された場合にジョブを削除し、Skip::unlessメソッドは、条件がfalseと評価された場合にジョブを削除します。

1use Illuminate\Queue\Middleware\Skip;
2 
3/**
4 * Get the middleware the job should pass through.
5 */
6public function middleware(): array
7{
8 return [
9 Skip::when($someCondition),
10 ];
11}

より複雑な条件評価のために、whenおよびunlessメソッドにClosureを渡すこともできます。

1use Illuminate\Queue\Middleware\Skip;
2 
3/**
4 * Get the middleware the job should pass through.
5 */
6public function middleware(): array
7{
8 return [
9 Skip::when(function (): bool {
10 return $this->shouldSkip();
11 }),
12 ];
13}

ジョブのディスパッチ

ジョブクラスを作成したら、ジョブ自体のdispatchメソッドを使用してディスパッチできます。dispatchメソッドに渡された引数は、ジョブのコンストラクタに渡されます。

1<?php
2 
3namespace App\Http\Controllers;
4 
5use App\Http\Controllers\Controller;
6use App\Jobs\ProcessPodcast;
7use App\Models\Podcast;
8use Illuminate\Http\RedirectResponse;
9use Illuminate\Http\Request;
10 
11class PodcastController extends Controller
12{
13 /**
14 * Store a new podcast.
15 */
16 public function store(Request $request): RedirectResponse
17 {
18 $podcast = Podcast::create(/* ... */);
19 
20 // ...
21 
22 ProcessPodcast::dispatch($podcast);
23 
24 return redirect('/podcasts');
25 }
26}

条件付きでジョブをディスパッチしたい場合は、dispatchIfおよびdispatchUnlessメソッドを使用できます。

1ProcessPodcast::dispatchIf($accountActive, $podcast);
2 
3ProcessPodcast::dispatchUnless($accountSuspended, $podcast);

新しいLaravelアプリケーションでは、syncドライバがデフォルトのキュードライバです。このドライバは、現在のリクエストのフォアグラウンドでジョブを同期的に実行します。これは、ローカル開発中には便利なことが多いです。実際にバックグラウンド処理のためにジョブのキューイングを開始したい場合は、アプリケーションのconfig/queue.php設定ファイルで別のキュードライバを指定できます。

遅延ディスパッチ

ジョブがキューワーカによってすぐに処理可能になるべきではないと指定したい場合は、ジョブをディスパッチするときにdelayメソッドを使用できます。たとえば、ジョブがディスパッチされてから10分後まで処理可能にしないように指定してみましょう。

1<?php
2 
3namespace App\Http\Controllers;
4 
5use App\Http\Controllers\Controller;
6use App\Jobs\ProcessPodcast;
7use App\Models\Podcast;
8use Illuminate\Http\RedirectResponse;
9use Illuminate\Http\Request;
10 
11class PodcastController extends Controller
12{
13 /**
14 * Store a new podcast.
15 */
16 public function store(Request $request): RedirectResponse
17 {
18 $podcast = Podcast::create(/* ... */);
19 
20 // ...
21 
22 ProcessPodcast::dispatch($podcast)
23 ->delay(now()->addMinutes(10));
24 
25 return redirect('/podcasts');
26 }
27}

場合によっては、ジョブにデフォルトの遅延が設定されていることがあります。この遅延をバイパスしてジョブを即時処理のためにディスパッチする必要がある場合は、withoutDelayメソッドを使用できます。

1ProcessPodcast::dispatch($podcast)->withoutDelay();

Amazon SQSキューサービスの最大遅延時間は15分です。

ブラウザに応答が送信された後のディスパッチ

あるいは、dispatchAfterResponseメソッドは、WebサーバがFastCGIを使用している場合、HTTPレスポンスがユーザーのブラウザに送信されるまでジョブのディスパッチを遅延させます。これにより、キュー投入されたジョブがまだ実行中であっても、ユーザーはアプリケーションの使用を開始できます。これは通常、メール送信など約1秒かかるジョブにのみ使用すべきです。現在のHTTPリクエスト内で処理されるため、この方法でディスパッチされたジョブは、処理されるためにキューワーカが実行されている必要はありません。

1use App\Jobs\SendNotification;
2 
3SendNotification::dispatchAfterResponse();

クロージャをdispatchし、dispatchヘルパーにafterResponseメソッドをチェーンして、HTTPレスポンスがブラウザに送信された後にクロージャを実行することもできます。

1use App\Mail\WelcomeMessage;
2use Illuminate\Support\Facades\Mail;
3 
4dispatch(function () {
5 Mail::to('[email protected]')->send(new WelcomeMessage);
6})->afterResponse();

同期ディスパッチ

ジョブを即時(同期的)にディスパッチしたい場合は、dispatchSyncメソッドを使用できます。このメソッドを使用すると、ジョブはキューに投入されず、現在のプロセス内で即時に実行されます。

1<?php
2 
3namespace App\Http\Controllers;
4 
5use App\Http\Controllers\Controller;
6use App\Jobs\ProcessPodcast;
7use App\Models\Podcast;
8use Illuminate\Http\RedirectResponse;
9use Illuminate\Http\Request;
10 
11class PodcastController extends Controller
12{
13 /**
14 * Store a new podcast.
15 */
16 public function store(Request $request): RedirectResponse
17 {
18 $podcast = Podcast::create(/* ... */);
19 
20 // Create podcast...
21 
22 ProcessPodcast::dispatchSync($podcast);
23 
24 return redirect('/podcasts');
25 }
26}

ジョブとデータベーストランザクション

データベーストランザクション内でジョブをディスパッチすることは全く問題ありませんが、ジョブが実際に正常に実行できることを確認するために特別な注意を払う必要があります。トランザクション内でジョブをディスパッチすると、親トランザクションがコミットされる前にジョブがワーカによって処理される可能性があります。この場合、データベーストランザクション中にモデルやデータベースレコードに行った更新が、まだデータベースに反映されていない可能性があります。さらに、トランザクション内で作成されたモデルやデータベースレコードがデータベースに存在しない可能性もあります。

ありがたいことに、Laravelはこの問題を回避するためのいくつかの方法を提供しています。まず、キュー接続の設定配列でafter_commit接続オプションを設定できます。

1'redis' => [
2 'driver' => 'redis',
3 // ...
4 'after_commit' => true,
5],

after_commitオプションがtrueの場合、データベーストランザクション内でジョブをディスパッチできます。しかし、Laravelは、開いている親データベーストランザクションがコミットされるまで待ってから、実際にジョブをディスパッチします。もちろん、現在開いているデータベーストランザクションがない場合、ジョブはすぐにディスパッチされます。

トランザクション中に発生した例外によりトランザクションがロールバックされた場合、そのトランザクション中にディスパッチされたジョブは破棄されます。

after_commit設定オプションをtrueに設定すると、キュー投入されたイベントリスナ、メイラブル、通知、ブロードキャストイベントも、開いているすべてのデータベーストランザクションがコミットされた後にディスパッチされるようになります。

コミットディスパッチ動作のインライン指定

after_commitキュー接続設定オプションをtrueに設定しない場合でも、特定のジョブがすべての開いているデータベーストランザクションがコミットされた後にディスパッチされるように指定できます。これを実現するには、ディスパッチ操作にafterCommitメソッドをチェーンします。

1use App\Jobs\ProcessPodcast;
2 
3ProcessPodcast::dispatch($podcast)->afterCommit();

同様に、after_commit設定オプションがtrueに設定されている場合、開いているデータベーストランザクションのコミットを待たずに、特定のジョブをすぐにディスパッチするように指示できます。

1ProcessPodcast::dispatch($podcast)->beforeCommit();

ジョブチェーン

ジョブチェーンを使用すると、プライマリジョブが正常に実行された後にシーケンスで実行されるべきキュー投入ジョブのリストを指定できます。シーケンス内の1つのジョブが失敗すると、残りのジョブは実行されません。キュー投入ジョブチェーンを実行するには、Busファサードが提供するchainメソッドを使用できます。Laravelのコマンドバスは、キュー投入ジョブディスパッチが構築されている低レベルのコンポーネントです。

1use App\Jobs\OptimizePodcast;
2use App\Jobs\ProcessPodcast;
3use App\Jobs\ReleasePodcast;
4use Illuminate\Support\Facades\Bus;
5 
6Bus::chain([
7 new ProcessPodcast,
8 new OptimizePodcast,
9 new ReleasePodcast,
10])->dispatch();

ジョブクラスのインスタンスをチェーンするだけでなく、クロージャをチェーンすることもできます。

1Bus::chain([
2 new ProcessPodcast,
3 new OptimizePodcast,
4 function () {
5 Podcast::update(/* ... */);
6 },
7])->dispatch();

ジョブ内で$this->delete()メソッドを使用してジョブを削除しても、チェーンされたジョブの処理は妨げられません。チェーンは、チェーン内のジョブが失敗した場合にのみ実行を停止します。

チェーンの接続とキュー

チェーンされたジョブに使用する接続とキューを指定したい場合は、onConnectionおよびonQueueメソッドを使用できます。これらのメソッドは、キュー投入されたジョブに明示的に異なる接続/キューが割り当てられていない限り、使用されるキュー接続とキュー名を指定します。

1Bus::chain([
2 new ProcessPodcast,
3 new OptimizePodcast,
4 new ReleasePodcast,
5])->onConnection('redis')->onQueue('podcasts')->dispatch();

チェーンへのジョブの追加

時として、そのチェーン内の別のジョブから既存のジョブチェーンにジョブを先頭または末尾に追加する必要がある場合があります。これは、prependToChainおよびappendToChainメソッドを使用して実現できます。

1/**
2 * Execute the job.
3 */
4public function handle(): void
5{
6 // ...
7 
8 // Prepend to the current chain, run job immediately after current job...
9 $this->prependToChain(new TranscribePodcast);
10 
11 // Append to the current chain, run job at end of chain...
12 $this->appendToChain(new TranscribePodcast);
13}

チェーンの失敗

ジョブをチェーンする場合、catchメソッドを使用して、チェーン内のジョブが失敗した場合に呼び出されるクロージャを指定できます。指定されたコールバックは、ジョブの失敗を引き起こしたThrowableインスタンスを受け取ります。

1use Illuminate\Support\Facades\Bus;
2use Throwable;
3 
4Bus::chain([
5 new ProcessPodcast,
6 new OptimizePodcast,
7 new ReleasePodcast,
8])->catch(function (Throwable $e) {
9 // A job within the chain has failed...
10})->dispatch();

チェーンコールバックはシリアライズされ、後でLaravelキューによって実行されるため、チェーンコールバック内で$this変数を使用しないでください。

キューと接続のカスタマイズ

特定のキューへのディスパッチ

ジョブを異なるキューにプッシュすることで、キュー投入されたジョブを「分類」し、さまざまなキューに割り当てるワーカの数を優先順位付けすることもできます。これは、キュー設定ファイルで定義された異なるキュー「接続」にジョブをプッシュするのではなく、単一の接続内の特定のキューにのみプッシュすることに注意してください。キューを指定するには、ジョブをディスパッチするときにonQueueメソッドを使用します。

1<?php
2 
3namespace App\Http\Controllers;
4 
5use App\Http\Controllers\Controller;
6use App\Jobs\ProcessPodcast;
7use App\Models\Podcast;
8use Illuminate\Http\RedirectResponse;
9use Illuminate\Http\Request;
10 
11class PodcastController extends Controller
12{
13 /**
14 * Store a new podcast.
15 */
16 public function store(Request $request): RedirectResponse
17 {
18 $podcast = Podcast::create(/* ... */);
19 
20 // Create podcast...
21 
22 ProcessPodcast::dispatch($podcast)->onQueue('processing');
23 
24 return redirect('/podcasts');
25 }
26}

あるいは、ジョブのコンストラクタ内でonQueueメソッドを呼び出すことで、ジョブのキューを指定することもできます。

1<?php
2 
3namespace App\Jobs;
4 
5use Illuminate\Contracts\Queue\ShouldQueue;
6use Illuminate\Foundation\Queue\Queueable;
7 
8class ProcessPodcast implements ShouldQueue
9{
10 use Queueable;
11 
12 /**
13 * Create a new job instance.
14 */
15 public function __construct()
16 {
17 $this->onQueue('processing');
18 }
19}

特定の接続へのディスパッチ

アプリケーションが複数のキュー接続と対話する場合、onConnectionメソッドを使用して、ジョブをどの接続にプッシュするかを指定できます。

1<?php
2 
3namespace App\Http\Controllers;
4 
5use App\Http\Controllers\Controller;
6use App\Jobs\ProcessPodcast;
7use App\Models\Podcast;
8use Illuminate\Http\RedirectResponse;
9use Illuminate\Http\Request;
10 
11class PodcastController extends Controller
12{
13 /**
14 * Store a new podcast.
15 */
16 public function store(Request $request): RedirectResponse
17 {
18 $podcast = Podcast::create(/* ... */);
19 
20 // Create podcast...
21 
22 ProcessPodcast::dispatch($podcast)->onConnection('sqs');
23 
24 return redirect('/podcasts');
25 }
26}

onConnectiononQueueメソッドをチェーンして、ジョブの接続とキューを指定できます。

1ProcessPodcast::dispatch($podcast)
2 ->onConnection('sqs')
3 ->onQueue('processing');

あるいは、ジョブのコンストラクタ内でonConnectionメソッドを呼び出すことで、ジョブの接続を指定することもできます。

1<?php
2 
3namespace App\Jobs;
4 
5use Illuminate\Contracts\Queue\ShouldQueue;
6use Illuminate\Foundation\Queue\Queueable;
7 
8class ProcessPodcast implements ShouldQueue
9{
10 use Queueable;
11 
12 /**
13 * Create a new job instance.
14 */
15 public function __construct()
16 {
17 $this->onConnection('sqs');
18 }
19}

最大試行回数/タイムアウト値の指定

最大試行回数

キュー投入されたジョブの1つがエラーに遭遇している場合、無期限に再試行し続けることは望ましくないでしょう。そのため、Laravelはジョブが何回またはどのくらいの期間試行されるかを指定するさまざまな方法を提供しています。

ジョブが試行される最大回数を指定する1つの方法は、Artisanコマンドラインの--triesスイッチを使用することです。これは、処理中のジョブが試行回数を指定しない限り、ワーカによって処理されるすべてのジョブに適用されます。

1php artisan queue:work --tries=3

ジョブが最大試行回数を超えた場合、「失敗した」ジョブと見なされます。失敗したジョブの処理に関する詳細については、失敗したジョブのドキュメントを参照してください。queue:workコマンドに--tries=0が指定された場合、ジョブは無期限に再試行されます。

ジョブクラス自体でジョブが試行される最大回数を定義することで、より詳細なアプローチを取ることができます。ジョブで最大試行回数が指定されている場合、コマンドラインで提供される--tries値よりも優先されます。

1<?php
2 
3namespace App\Jobs;
4 
5class ProcessPodcast implements ShouldQueue
6{
7 /**
8 * The number of times the job may be attempted.
9 *
10 * @var int
11 */
12 public $tries = 5;
13}

特定のジョブの最大試行回数を動的に制御する必要がある場合は、ジョブにtriesメソッドを定義できます。

1/**
2 * Determine number of times the job may be attempted.
3 */
4public function tries(): int
5{
6 return 5;
7}

時間ベースの試行

ジョブが失敗するまでに何回試行されるかを定義する代わりに、ジョブが試行されなくなる時間を定義できます。これにより、特定の期間内にジョブを何回でも試行できます。ジョブが試行されなくなる時間を定義するには、ジョブクラスにretryUntilメソッドを追加します。このメソッドはDateTimeインスタンスを返す必要があります。

1use DateTime;
2 
3/**
4 * Determine the time at which the job should timeout.
5 */
6public function retryUntil(): DateTime
7{
8 return now()->addMinutes(10);
9}

キュー投入されたイベントリスナtriesプロパティまたはretryUntilメソッドを定義することもできます。

最大例外数

ジョブを何度も試行できるが、再試行が特定の数の未処理例外によってトリガーされた場合(releaseメソッドによって直接解放されるのではなく)に失敗するように指定したい場合があります。これを実現するために、ジョブクラスにmaxExceptionsプロパティを定義できます。

1<?php
2 
3namespace App\Jobs;
4 
5use Illuminate\Support\Facades\Redis;
6 
7class ProcessPodcast implements ShouldQueue
8{
9 /**
10 * The number of times the job may be attempted.
11 *
12 * @var int
13 */
14 public $tries = 25;
15 
16 /**
17 * The maximum number of unhandled exceptions to allow before failing.
18 *
19 * @var int
20 */
21 public $maxExceptions = 3;
22 
23 /**
24 * Execute the job.
25 */
26 public function handle(): void
27 {
28 Redis::throttle('key')->allow(10)->every(60)->then(function () {
29 // Lock obtained, process the podcast...
30 }, function () {
31 // Unable to obtain lock...
32 return $this->release(10);
33 });
34 }
35}

この例では、アプリケーションがRedisロックを取得できない場合、ジョブは10秒間解放され、最大25回まで再試行され続けます。しかし、ジョブによって3つの未処理例外がスローされた場合、ジョブは失敗します。

タイムアウト

多くの場合、キュー投入されたジョブがどのくらいの時間がかかるかおおよそわかっています。このため、Laravelでは「タイムアウト」値を指定できます。デフォルトでは、タイムアウト値は60秒です。ジョブがタイムアウト値で指定された秒数より長く処理されている場合、ジョブを処理しているワーカはエラーで終了します。通常、ワーカはサーバに設定されたプロセスマネージャによって自動的に再起動されます。

ジョブが実行できる最大秒数は、Artisanコマンドラインの--timeoutスイッチを使用して指定できます。

1php artisan queue:work --timeout=30

ジョブが継続的にタイムアウトすることで最大試行回数を超えた場合、失敗としてマークされます。

ジョブクラス自体でジョブが実行を許可される最大秒数を定義することもできます。ジョブでタイムアウトが指定されている場合、コマンドラインで指定されたタイムアウトよりも優先されます。

1<?php
2 
3namespace App\Jobs;
4 
5class ProcessPodcast implements ShouldQueue
6{
7 /**
8 * The number of seconds the job can run before timing out.
9 *
10 * @var int
11 */
12 public $timeout = 120;
13}

ソケットや発信HTTP接続などのIOブロッキングプロセスは、指定したタイムアウトを尊重しない場合があります。したがって、これらの機能を使用する場合は、常にAPIを使用してタイムアウトを指定するように試みる必要があります。たとえば、Guzzleを使用する場合は、常に接続とリクエストのタイムアウト値を指定する必要があります。

ジョブのタイムアウトを指定するには、pcntl PHP拡張をインストールする必要があります。さらに、ジョブの「タイムアウト」値は、常にその「再試行後」の値よりも短くする必要があります。そうしないと、ジョブが実際に実行を完了するかタイムアウトする前に再試行される可能性があります。

タイムアウト時に失敗

タイムアウト時にジョブを失敗としてマークすることを示したい場合は、ジョブクラスに$failOnTimeoutプロパティを定義できます。

1/**
2 * Indicate if the job should be marked as failed on timeout.
3 *
4 * @var bool
5 */
6public $failOnTimeout = true;

エラー処理

ジョブの処理中に例外がスローされると、ジョブは自動的にキューに戻され、再試行されます。ジョブは、アプリケーションで許可されている最大試行回数に達するまで解放され続けます。最大試行回数は、queue:work Artisanコマンドで使用される--triesスイッチによって定義されます。あるいは、最大試行回数はジョブクラス自体で定義することもできます。キューワーカの実行に関する詳細は以下にあります

ジョブの手動解放

ジョブを後で再試行できるように、手動でキューに戻したい場合があります。これは、releaseメソッドを呼び出すことで実現できます。

1/**
2 * Execute the job.
3 */
4public function handle(): void
5{
6 // ...
7 
8 $this->release();
9}

デフォルトでは、releaseメソッドはジョブを即時処理のためにキューに戻します。しかし、releaseメソッドに整数または日付インスタンスを渡すことで、指定された秒数が経過するまでジョブを処理可能にしないようにキューに指示できます。

1$this->release(10);
2 
3$this->release(now()->addSeconds(10));

ジョブの手動失敗

時として、ジョブを「失敗」として手動でマークする必要がある場合があります。そのためには、failメソッドを呼び出すことができます。

1/**
2 * Execute the job.
3 */
4public function handle(): void
5{
6 // ...
7 
8 $this->fail();
9}

キャッチした例外のためにジョブを失敗としてマークしたい場合は、例外をfailメソッドに渡すことができます。または、便宜上、文字列のエラーメッセージを渡すこともでき、これは自動的に例外に変換されます。

1$this->fail($exception);
2 
3$this->fail('Something went wrong.');

失敗したジョブの詳細については、ジョブの失敗の処理に関するドキュメントを参照してください。

ジョブバッチ

Laravelのジョブバッチ機能を使用すると、ジョブのバッチを簡単に実行し、ジョブのバッチが実行を完了したときに何らかのアクションを実行できます。始める前に、完了率などのジョブバッチに関するメタ情報を含むテーブルを構築するためのデータベースマイグレーションを作成する必要があります。このマイグレーションは、make:queue-batches-table Artisanコマンドを使用して生成できます。

1php artisan make:queue-batches-table
2 
3php artisan migrate

バッチ可能なジョブの定義

バッチ可能なジョブを定義するには、通常どおりキュー投入可能なジョブを作成する必要がありますが、ジョブクラスにIlluminate\Bus\Batchableトレイトを追加する必要があります。このトレイトは、ジョブが実行されている現在のバッチを取得するために使用できるbatchメソッドへのアクセスを提供します。

1<?php
2 
3namespace App\Jobs;
4 
5use Illuminate\Bus\Batchable;
6use Illuminate\Contracts\Queue\ShouldQueue;
7use Illuminate\Foundation\Queue\Queueable;
8 
9class ImportCsv implements ShouldQueue
10{
11 use Batchable, Queueable;
12 
13 /**
14 * Execute the job.
15 */
16 public function handle(): void
17 {
18 if ($this->batch()->cancelled()) {
19 // Determine if the batch has been cancelled...
20 
21 return;
22 }
23 
24 // Import a portion of the CSV file...
25 }
26}

バッチのディスパッチ

ジョブのバッチをディスパッチするには、Busファサードのbatchメソッドを使用する必要があります。もちろん、バッチ処理は完了コールバックと組み合わせることで主に役立ちます。したがって、thencatchfinallyメソッドを使用して、バッチの完了コールバックを定義できます。これらの各コールバックは、呼び出されるときにIlluminate\Bus\Batchインスタンスを受け取ります。この例では、CSVファイルから指定された行数をそれぞれ処理するジョブのバッチをキューに入れていると想像します。

1use App\Jobs\ImportCsv;
2use Illuminate\Bus\Batch;
3use Illuminate\Support\Facades\Bus;
4use Throwable;
5 
6$batch = Bus::batch([
7 new ImportCsv(1, 100),
8 new ImportCsv(101, 200),
9 new ImportCsv(201, 300),
10 new ImportCsv(301, 400),
11 new ImportCsv(401, 500),
12])->before(function (Batch $batch) {
13 // The batch has been created but no jobs have been added...
14})->progress(function (Batch $batch) {
15 // A single job has completed successfully...
16})->then(function (Batch $batch) {
17 // All jobs completed successfully...
18})->catch(function (Batch $batch, Throwable $e) {
19 // First batch job failure detected...
20})->finally(function (Batch $batch) {
21 // The batch has finished executing...
22})->dispatch();
23 
24return $batch->id;

$batch->idプロパティを介してアクセスできるバッチのIDは、ディスパッチされた後にバッチに関する情報をLaravelコマンドバスに問い合わせるために使用できます。

バッチコールバックはシリアライズされ、後でLaravelキューによって実行されるため、コールバック内で$this変数を使用しないでください。さらに、バッチ処理されたジョブはデータベーストランザクション内でラップされるため、暗黙的なコミットをトリガーするデータベースステートメントはジョブ内で実行しないでください。

バッチの命名

Laravel HorizonやLaravel Telescopeなどの一部のツールは、バッチに名前が付けられている場合、バッチに対してよりユーザーフレンドリーなデバッグ情報を提供することがあります。バッチに任意の名前を割り当てるには、バッチを定義するときにnameメソッドを呼び出すことができます。

1$batch = Bus::batch([
2 // ...
3])->then(function (Batch $batch) {
4 // All jobs completed successfully...
5})->name('Import CSV')->dispatch();

バッチの接続とキュー

バッチ処理されたジョブに使用する接続とキューを指定したい場合は、onConnectionおよびonQueueメソッドを使用できます。すべてのバッチ処理されたジョブは、同じ接続とキュー内で実行する必要があります。

1$batch = Bus::batch([
2 // ...
3])->then(function (Batch $batch) {
4 // All jobs completed successfully...
5})->onConnection('redis')->onQueue('imports')->dispatch();

チェーンとバッチ

チェーンされたジョブのセットをバッチ内で定義するには、チェーンされたジョブを配列内に配置します。たとえば、2つのジョブチェーンを並行して実行し、両方のジョブチェーンが処理を完了したときにコールバックを実行できます。

1use App\Jobs\ReleasePodcast;
2use App\Jobs\SendPodcastReleaseNotification;
3use Illuminate\Bus\Batch;
4use Illuminate\Support\Facades\Bus;
5 
6Bus::batch([
7 [
8 new ReleasePodcast(1),
9 new SendPodcastReleaseNotification(1),
10 ],
11 [
12 new ReleasePodcast(2),
13 new SendPodcastReleaseNotification(2),
14 ],
15])->then(function (Batch $batch) {
16 // ...
17})->dispatch();

逆に、チェーン内でジョブのバッチを実行するには、チェーン内にバッチを定義します。たとえば、まず複数のポッドキャストをリリースするためのジョブのバッチを実行し、次にリリース通知を送信するためのジョブのバッチを実行できます。

1use App\Jobs\FlushPodcastCache;
2use App\Jobs\ReleasePodcast;
3use App\Jobs\SendPodcastReleaseNotification;
4use Illuminate\Support\Facades\Bus;
5 
6Bus::chain([
7 new FlushPodcastCache,
8 Bus::batch([
9 new ReleasePodcast(1),
10 new ReleasePodcast(2),
11 ]),
12 Bus::batch([
13 new SendPodcastReleaseNotification(1),
14 new SendPodcastReleaseNotification(2),
15 ]),
16])->dispatch();

バッチへのジョブ追加

バッチ処理されたジョブ内からバッチに追加のジョブを追加すると便利な場合があります。このパターンは、Webリクエスト中にディスパッチするのに時間がかかりすぎる可能性のある何千ものジョブをバッチ処理する必要がある場合に役立ちます。そこで、代わりに、さらに多くのジョブでバッチをハイドレートする「ローダー」ジョブの初期バッチをディスパッチすることができます。

1$batch = Bus::batch([
2 new LoadImportBatch,
3 new LoadImportBatch,
4 new LoadImportBatch,
5])->then(function (Batch $batch) {
6 // All jobs completed successfully...
7})->name('Import Contacts')->dispatch();

この例では、LoadImportBatchジョブを使用して、バッチに追加のジョブをハイドレートします。これを実現するために、ジョブのbatchメソッドを介してアクセスできるバッチインスタンスでaddメソッドを使用できます。

1use App\Jobs\ImportContacts;
2use Illuminate\Support\Collection;
3 
4/**
5 * Execute the job.
6 */
7public function handle(): void
8{
9 if ($this->batch()->cancelled()) {
10 return;
11 }
12 
13 $this->batch()->add(Collection::times(1000, function () {
14 return new ImportContacts;
15 }));
16}

ジョブは、同じバッチに属するジョブ内からのみバッチに追加できます。

バッチの調査

バッチ完了コールバックに提供されるIlluminate\Bus\Batchインスタンスには、特定のジョブのバッチと対話し、検査するのに役立つさまざまなプロパティとメソッドがあります。

1// The UUID of the batch...
2$batch->id;
3 
4// The name of the batch (if applicable)...
5$batch->name;
6 
7// The number of jobs assigned to the batch...
8$batch->totalJobs;
9 
10// The number of jobs that have not been processed by the queue...
11$batch->pendingJobs;
12 
13// The number of jobs that have failed...
14$batch->failedJobs;
15 
16// The number of jobs that have been processed thus far...
17$batch->processedJobs();
18 
19// The completion percentage of the batch (0-100)...
20$batch->progress();
21 
22// Indicates if the batch has finished executing...
23$batch->finished();
24 
25// Cancel the execution of the batch...
26$batch->cancel();
27 
28// Indicates if the batch has been cancelled...
29$batch->cancelled();

ルートからのバッチの返却

すべてのIlluminate\Bus\BatchインスタンスはJSONシリアライズ可能であり、つまり、アプリケーションのルートの1つから直接返すことで、完了進捗を含むバッチに関する情報を含むJSONペイロードを取得できます。これにより、アプリケーションのUIでバッチの完了進捗に関する情報を表示することが便利になります。

IDでバッチを取得するには、BusファサードのfindBatchメソッドを使用できます。

1use Illuminate\Support\Facades\Bus;
2use Illuminate\Support\Facades\Route;
3 
4Route::get('/batch/{batchId}', function (string $batchId) {
5 return Bus::findBatch($batchId);
6});

バッチのキャンセル

特定のバッチの実行をキャンセルする必要がある場合があります。これは、Illuminate\Bus\Batchインスタンスでcancelメソッドを呼び出すことで実現できます。

1/**
2 * Execute the job.
3 */
4public function handle(): void
5{
6 if ($this->user->exceedsImportLimit()) {
7 return $this->batch()->cancel();
8 }
9 
10 if ($this->batch()->cancelled()) {
11 return;
12 }
13}

前の例で気づいたかもしれませんが、バッチ処理されたジョブは通常、実行を続行する前に対応するバッチがキャンセルされたかどうかを判断する必要があります。しかし、便宜上、代わりにSkipIfBatchCancelled ミドルウェアをジョブに割り当てることができます。その名前が示すように、このミドルウェアは、対応するバッチがキャンセルされた場合、ジョブを処理しないようにLaravelに指示します。

1use Illuminate\Queue\Middleware\SkipIfBatchCancelled;
2 
3/**
4 * Get the middleware the job should pass through.
5 */
6public function middleware(): array
7{
8 return [new SkipIfBatchCancelled];
9}

バッチの失敗

バッチ処理されたジョブが失敗すると、(割り当てられている場合)catchコールバックが呼び出されます。このコールバックは、バッチ内で最初に失敗したジョブに対してのみ呼び出されます。

失敗の許可

バッチ内のジョブが失敗すると、Laravelは自動的にバッチを「キャンセル済み」としてマークします。必要に応じて、この動作を無効にして、ジョブの失敗が自動的にバッチをキャンセル済みとしてマークしないようにすることができます。これは、バッチをディスパッチするときにallowFailuresメソッドを呼び出すことで実現できます。

1$batch = Bus::batch([
2 // ...
3])->then(function (Batch $batch) {
4 // All jobs completed successfully...
5})->allowFailures()->dispatch();

失敗したバッチジョブの再試行

便宜上、Laravelはqueue:retry-batch Artisanコマンドを提供しており、特定のバッチの失敗したすべてのジョブを簡単に再試行できます。queue:retry-batchコマンドは、失敗したジョブを再試行するバッチのUUIDを受け入れます。

1php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5

バッチの整理

整理しないと、job_batchesテーブルは非常に速くレコードを蓄積する可能性があります。これを軽減するために、queue:prune-batches Artisanコマンドを毎日実行するようにスケジュールする必要があります。

1use Illuminate\Support\Facades\Schedule;
2 
3Schedule::command('queue:prune-batches')->daily();

デフォルトでは、24時間以上経過したすべての終了したバッチが整理されます。コマンドを呼び出すときにhoursオプションを使用して、バッチデータを保持する期間を決定できます。たとえば、次のコマンドは、48時間以上前に終了したすべてのバッチを削除します。

1use Illuminate\Support\Facades\Schedule;
2 
3Schedule::command('queue:prune-batches --hours=48')->daily();

時として、jobs_batchesテーブルは、正常に完了しなかったバッチ(たとえば、ジョブが失敗し、そのジョブが正常に再試行されなかったバッチなど)のバッチレコードを蓄積することがあります。queue:prune-batchesコマンドに、unfinishedオプションを使用してこれらの未完了のバッチレコードを整理するように指示できます。

1use Illuminate\Support\Facades\Schedule;
2 
3Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();

同様に、jobs_batchesテーブルはキャンセルされたバッチのバッチレコードも蓄積することがあります。queue:prune-batchesコマンドに、cancelledオプションを使用してこれらのキャンセルされたバッチレコードを整理するように指示できます。

1use Illuminate\Support\Facades\Schedule;
2 
3Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();

DynamoDBへのバッチ保存

Laravelは、リレーショナルデータベースの代わりにDynamoDBにバッチのメタ情報を保存するサポートも提供しています。ただし、すべてのバッチレコードを保存するためのDynamoDBテーブルを手動で作成する必要があります。

通常、このテーブルはjob_batchesという名前であるべきですが、アプリケーションのqueue設定ファイル内のqueue.batching.table設定値に基づいてテーブルに名前を付ける必要があります。

DynamoDBバッチテーブルの設定

job_batchesテーブルには、applicationという名前の文字列プライマリパーティションキーと、idという名前の文字列プライマリソートキーが必要です。キーのapplication部分には、アプリケーションのapp設定ファイル内のname設定値で定義されたアプリケーション名が含まれます。アプリケーション名はDynamoDBテーブルのキーの一部であるため、同じテーブルを使用して複数のLaravelアプリケーションのジョブバッチを保存できます。

さらに、自動バッチ整理を利用したい場合は、テーブルにttl属性を定義できます。

DynamoDBの設定

次に、LaravelアプリケーションがAmazon DynamoDBと通信できるようにAWS SDKをインストールします。

1composer require aws/aws-sdk-php

次に、queue.batching.driver設定オプションの値をdynamodbに設定します。さらに、batching設定配列内にkeysecretregion設定オプションを定義する必要があります。これらのオプションはAWSでの認証に使用されます。dynamodbドライバを使用する場合、queue.batching.database設定オプションは不要です。

1'batching' => [
2 'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),
3 'key' => env('AWS_ACCESS_KEY_ID'),
4 'secret' => env('AWS_SECRET_ACCESS_KEY'),
5 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
6 'table' => 'job_batches',
7],

DynamoDBでのバッチ整理

ジョブバッチ情報を保存するためにDynamoDBを利用する場合、リレーショナルデータベースに保存されたバッチを整理するために使用される通常の整理コマンドは機能しません。代わりに、DynamoDBのネイティブTTL機能を利用して、古いバッチのレコードを自動的に削除できます。

DynamoDBテーブルをttl属性で定義した場合、バッチレコードを整理する方法をLaravelに指示するための設定パラメータを定義できます。queue.batching.ttl_attribute設定値はTTLを保持する属性の名前を定義し、queue.batching.ttl設定値は、レコードが最後に更新された時点から相対的に、バッチレコードをDynamoDBテーブルから削除できるまでの秒数を定義します。

1'batching' => [
2 'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
3 'key' => env('AWS_ACCESS_KEY_ID'),
4 'secret' => env('AWS_SECRET_ACCESS_KEY'),
5 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
6 'table' => 'job_batches',
7 'ttl_attribute' => 'ttl',
8 'ttl' => 60 * 60 * 24 * 7, // 7 days...
9],

クロージャのキューイング

ジョブクラスをキューにディスパッチする代わりに、クロージャをディスパッチすることもできます。これは、現在のリクエストサイクルの外で実行する必要がある、迅速で簡単なタスクに最適です。クロージャをキューにディスパッチする場合、クロージャのコード内容は暗号化署名されるため、転送中に変更することはできません。

1$podcast = App\Podcast::find(1);
2 
3dispatch(function () use ($podcast) {
4 $podcast->publish();
5});

catchメソッドを使用すると、キュー投入されたクロージャがキューの設定されたすべての再試行を使い果たした後に正常に完了できなかった場合に実行されるクロージャを提供できます。

1use Throwable;
2 
3dispatch(function () use ($podcast) {
4 $podcast->publish();
5})->catch(function (Throwable $e) {
6 // This job has failed...
7});

catchコールバックはシリアライズされ、後でLaravelキューによって実行されるため、catchコールバック内で$this変数を使用しないでください。

キューワーカの実行

queue:workコマンド

Laravelには、キューワーカを起動し、キューにプッシュされた新しいジョブを処理するArtisanコマンドが含まれています。queue:work Artisanコマンドを使用してワーカを実行できます。queue:workコマンドが開始されると、手動で停止するか、ターミナルを閉じるまで実行され続けることに注意してください。

1php artisan queue:work

queue:workプロセスをバックグラウンドで永続的に実行し続けるには、Supervisorなどのプロセスマネージャを使用して、キューワーカが停止しないようにする必要があります。

queue:workコマンドを呼び出すときに-vフラグを含めると、処理されたジョブIDをコマンドの出力に含めることができます。

1php artisan queue:work -v

キューワーカは長期間実行されるプロセスであり、起動したアプリケーションの状態をメモリに保存することに注意してください。その結果、開始後にコードベースの変更に気づきません。したがって、デプロイプロセス中にキューワーカを再起動するようにしてください。さらに、アプリケーションによって作成または変更された静的な状態は、ジョブ間で自動的にリセットされないことを覚えておいてください。

あるいは、queue:listenコマンドを実行することもできます。queue:listenコマンドを使用する場合、更新されたコードをリロードしたり、アプリケーションの状態をリセットしたりするときにワーカを手動で再起動する必要はありません。ただし、このコマンドはqueue:workコマンドよりも大幅に効率が低いです。

1php artisan queue:listen

複数のキューワーカの実行

キューに複数のワーカを割り当ててジョブを同時に処理するには、複数のqueue:workプロセスを開始するだけです。これは、ターミナルの複数のタブを介してローカルで行うか、本番環境ではプロセスマネージャの設定を使用して行うことができます。Supervisorを使用する場合は、numprocs設定値を使用できます。

接続とキューの指定

ワーカが利用すべきキュー接続を指定することもできます。workコマンドに渡される接続名は、config/queue.php設定ファイルで定義されている接続の1つに対応する必要があります。

1php artisan queue:work redis

デフォルトでは、queue:workコマンドは特定の接続のデフォルトキューのジョブのみを処理します。しかし、特定の接続に対して特定のキューのみを処理するようにキューワーカをさらにカスタマイズできます。たとえば、すべてのメールがredisキュー接続のemailsキューで処理される場合、次のコマンドを発行して、そのキューのみを処理するワーカを開始できます。

1php artisan queue:work redis --queue=emails

指定された数のジョブの処理

--onceオプションを使用して、ワーカにキューから1つのジョブのみを処理するように指示できます。

1php artisan queue:work --once

--max-jobsオプションを使用して、ワーカに指定された数のジョブを処理してから終了するように指示できます。このオプションは、Supervisorと組み合わせると、指定された数のジョブを処理した後にワーカが自動的に再起動され、蓄積されたメモリを解放できるため、便利です。

1php artisan queue:work --max-jobs=1000

キュー投入されたすべてのジョブを処理してから終了

--stop-when-emptyオプションを使用して、ワーカにすべてのジョブを処理してから正常に終了するように指示できます。このオプションは、キューが空になった後にコンテナをシャットダウンしたい場合に、Dockerコンテナ内でLaravelキューを処理するときに便利です。

1php artisan queue:work --stop-when-empty

指定された秒数のジョブの処理

--max-timeオプションを使用して、ワーカに指定された秒数ジョブを処理してから終了するように指示できます。このオプションは、Supervisorと組み合わせると、指定された時間ジョブを処理した後にワーカが自動的に再起動され、蓄積されたメモリを解放できるため、便利です。

1# Process jobs for one hour and then exit...
2php artisan queue:work --max-time=3600

ワーカのスリープ時間

キューにジョブがある場合、ワーカはジョブ間に遅延なくジョブを処理し続けます。しかし、sleepオプションは、利用可能なジョブがない場合にワーカが「スリープ」する秒数を決定します。もちろん、スリープ中、ワーカは新しいジョブを処理しません。

1php artisan queue:work --sleep=3

メンテナンスモードとキュー

アプリケーションがメンテナンスモードの間、キュー投入されたジョブは処理されません。アプリケーションがメンテナンスモードを解除されると、ジョブは通常どおり処理され続けます。

メンテナンスモードが有効になっている場合でもキューワーカにジョブを強制的に処理させるには、--forceオプションを使用できます。

1php artisan queue:work --force

リソースに関する考慮事項

デーモンキューワーカは、各ジョブを処理する前にフレームワークを「再起動」しません。したがって、各ジョブが完了した後に重いリソースを解放する必要があります。たとえば、GDライブラリで画像操作を行っている場合、画像の処理が終わったらimagedestroyでメモリを解放する必要があります。

キューの優先順位

キューの処理方法に優先順位を付けたい場合があります。たとえば、config/queue.php設定ファイルで、redis接続のデフォルトのqueuelowに設定できます。しかし、時々、ジョブをhigh優先度のキューにプッシュしたい場合があります。

1dispatch((new Job)->onQueue('high'));

highキューのすべてのジョブが処理されてからlowキューのジョブに進むことを確認するワーカを開始するには、workコマンドにキュー名をカンマ区切りリストで渡します。

1php artisan queue:work --queue=high,low

キューワーカとデプロイ

キューワーカは長期間実行されるプロセスであるため、再起動しない限りコードの変更に気づきません。したがって、キューワーカを使用するアプリケーションをデプロイする最も簡単な方法は、デプロイプロセス中にワーカを再起動することです。queue:restartコマンドを発行することで、すべてのワーカを正常に再起動できます。

1php artisan queue:restart

このコマンドは、すべてのキューワーカに現在のジョブの処理を終えた後に正常に終了するように指示するため、既存のジョブは失われません。queue:restartコマンドが実行されるとキューワーカは終了するため、Supervisorなどのプロセスマネージャを実行して、キューワーカを自動的に再起動する必要があります。

キューは、再起動シグナルを保存するためにキャッシュを使用します。したがって、この機能を使用する前に、アプリケーションにキャッシュドライバが正しく設定されていることを確認する必要があります。

ジョブの有効期限とタイムアウト

ジョブの有効期限

config/queue.php設定ファイルでは、各キュー接続にretry_afterオプションが定義されています。このオプションは、処理中のジョブを再試行する前にキュー接続が待機する秒数を指定します。たとえば、retry_afterの値が90に設定されている場合、ジョブが90秒間処理されても解放または削除されない場合、ジョブはキューに戻されます。通常、retry_after値は、ジョブが合理的に処理を完了するのにかかる最大秒数に設定する必要があります。

retry_after値を含まない唯一のキュー接続はAmazon SQSです。SQSは、AWSコンソール内で管理されるデフォルトの可視性タイムアウトに基づいてジョブを再試行します。

ワーカのタイムアウト

queue:work Artisanコマンドは、--timeoutオプションを公開しています。デフォルトでは、--timeout値は60秒です。ジョブがタイムアウト値で指定された秒数より長く処理されている場合、ジョブを処理しているワーカはエラーで終了します。通常、ワーカはサーバに設定されたプロセスマネージャによって自動的に再起動されます。

1php artisan queue:work --timeout=60

retry_after設定オプションと--timeout CLIオプションは異なりますが、連携してジョブが失われず、ジョブが一度だけ正常に処理されるようにします。

--timeout値は、常にretry_after設定値よりも数秒短くする必要があります。これにより、フリーズしたジョブを処理しているワーカが、ジョブが再試行される前に常に終了することが保証されます。--timeoutオプションがretry_after設定値よりも長い場合、ジョブが2回処理される可能性があります。

Supervisorの設定

本番環境では、queue:workプロセスを実行し続ける方法が必要です。queue:workプロセスは、ワーカのタイムアウト超過やqueue:restartコマンドの実行など、さまざまな理由で停止することがあります。

このため、queue:workプロセスが終了したことを検出し、自動的に再起動できるプロセスマネージャを設定する必要があります。さらに、プロセスマネージャを使用すると、同時に実行したいqueue:workプロセスの数を指定できます。Supervisorは、Linux環境で一般的に使用されるプロセスマネージャであり、次のドキュメントでその設定方法について説明します。

Supervisorのインストール

SupervisorはLinuxオペレーティングシステム用のプロセスマネージャであり、queue:workプロセスが失敗した場合に自動的に再起動します。UbuntuにSupervisorをインストールするには、次のコマンドを使用できます。

1sudo apt-get install supervisor

Supervisorの構成と管理が大変だと感じる場合は、Laravelキューワーカを実行するための完全に管理されたプラットフォームを提供するLaravel Cloudの使用を検討してください。

Supervisorの設定

Supervisorの設定ファイルは通常、/etc/supervisor/conf.dディレクトリに保存されます。このディレクトリ内に、プロセスの監視方法をsupervisorに指示する設定ファイルをいくつでも作成できます。たとえば、queue:workプロセスを開始および監視するlaravel-worker.confファイルを作成してみましょう。

1[program:laravel-worker]
2process_name=%(program_name)s_%(process_num)02d
3command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
4autostart=true
5autorestart=true
6stopasgroup=true
7killasgroup=true
8user=forge
9numprocs=8
10redirect_stderr=true
11stdout_logfile=/home/forge/app.com/worker.log
12stopwaitsecs=3600

この例では、numprocsディレクティブはSupervisorに8つのqueue:workプロセスを実行し、それらすべてを監視し、失敗した場合は自動的に再起動するように指示します。設定のcommandディレクティブを、目的のキュー接続とワーカオプションを反映するように変更する必要があります。

stopwaitsecsの値が、最も長く実行されるジョブが消費する秒数よりも大きいことを確認してください。そうしないと、Supervisorはジョブが処理を終える前に強制終了する可能性があります。

Supervisorの起動

設定ファイルが作成されたら、次のコマンドを使用してSupervisorの設定を更新し、プロセスを開始できます。

1sudo supervisorctl reread
2 
3sudo supervisorctl update
4 
5sudo supervisorctl start "laravel-worker:*"

Supervisorの詳細については、Supervisorのドキュメントを参照してください。

失敗したジョブの処理

キュー投入されたジョブが失敗することがあります。心配しないでください、物事は常に計画どおりに進むとは限りません!Laravelには、ジョブが試行される最大回数を指定する便利な方法が含まれています。非同期ジョブがこの試行回数を超えると、failed_jobsデータベーステーブルに挿入されます。同期的にディスパッチされたジョブが失敗した場合、このテーブルには保存されず、その例外はアプリケーションによって即座に処理されます。

failed_jobsテーブルを作成するマイグレーションは、通常、新しいLaravelアプリケーションにすでに存在します。しかし、アプリケーションにこのテーブルのマイグレーションが含まれていない場合は、make:queue-failed-tableコマンドを使用してマイグレーションを作成できます。

1php artisan make:queue-failed-table
2 
3php artisan migrate

キューワーカプロセスを実行するときに、queue:workコマンドの--triesスイッチを使用して、ジョブが試行される最大回数を指定できます。--triesオプションの値を指定しない場合、ジョブは1回だけ、またはジョブクラスの$triesプロパティで指定された回数だけ試行されます。

1php artisan queue:work redis --tries=3

--backoffオプションを使用すると、例外が発生したジョブを再試行する前にLaravelが待機する秒数を指定できます。デフォルトでは、ジョブはすぐにキューに戻され、再試行されます。

1php artisan queue:work redis --tries=3 --backoff=3

例外が発生したジョブを再試行する前にLaravelが待機する秒数をジョブごとに設定したい場合は、ジョブクラスにbackoffプロパティを定義することで行うことができます。

1/**
2 * The number of seconds to wait before retrying the job.
3 *
4 * @var int
5 */
6public $backoff = 3;

ジョブのバックオフ時間を決定するためにより複雑なロジックが必要な場合は、ジョブクラスにbackoffメソッドを定義できます。

1/**
2 * Calculate the number of seconds to wait before retrying the job.
3 */
4public function backoff(): int
5{
6 return 3;
7}

backoffメソッドからバックオフ値の配列を返すことで、「指数関数的」なバックオフを簡単に設定できます。この例では、再試行の遅延は、最初の再試行では1秒、2回目の再試行では5秒、3回目の再試行では10秒、それ以上の試行が残っている場合はそれ以降のすべての再試行で10秒になります。

1/**
2 * Calculate the number of seconds to wait before retrying the job.
3 *
4 * @return array<int, int>
5 */
6public function backoff(): array
7{
8 return [1, 5, 10];
9}

失敗したジョブの後のクリーンアップ

特定のジョブが失敗した場合、ユーザーにアラートを送信したり、ジョブによって部分的に完了したアクションを元に戻したりしたい場合があります。これを実現するために、ジョブクラスにfailedメソッドを定義できます。ジョブの失敗を引き起こしたThrowableインスタンスがfailedメソッドに渡されます。

1<?php
2 
3namespace App\Jobs;
4 
5use App\Models\Podcast;
6use App\Services\AudioProcessor;
7use Illuminate\Contracts\Queue\ShouldQueue;
8use Illuminate\Foundation\Queue\Queueable;
9use Throwable;
10 
11class ProcessPodcast implements ShouldQueue
12{
13 use Queueable;
14 
15 /**
16 * Create a new job instance.
17 */
18 public function __construct(
19 public Podcast $podcast,
20 ) {}
21 
22 /**
23 * Execute the job.
24 */
25 public function handle(AudioProcessor $processor): void
26 {
27 // Process uploaded podcast...
28 }
29 
30 /**
31 * Handle a job failure.
32 */
33 public function failed(?Throwable $exception): void
34 {
35 // Send user notification of failure, etc...
36 }
37}

failedメソッドを呼び出す前にジョブの新しいインスタンスがインスタンス化されるため、handleメソッド内で発生した可能性のあるクラスプロパティの変更は失われます。

失敗したジョブの再試行

failed_jobsデータベーステーブルに挿入されたすべての失敗したジョブを表示するには、queue:failed Artisanコマンドを使用できます。

1php artisan queue:failed

queue:failedコマンドは、ジョブID、接続、キュー、失敗時刻、およびジョブに関するその他の情報をリストします。ジョブIDは、失敗したジョブを再試行するために使用できます。たとえば、IDがce7bb17c-cdd8-41f0-a8ec-7b4fef4e5eceの失敗したジョブを再試行するには、次のコマンドを発行します。

1php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece

必要に応じて、コマンドに複数のIDを渡すことができます。

1php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d

特定のキューの失敗したすべてのジョブを再試行することもできます。

1php artisan queue:retry --queue=name

失敗したすべてのジョブを再試行するには、queue:retryコマンドを実行し、IDとしてallを渡します。

1php artisan queue:retry all

失敗したジョブを削除したい場合は、queue:forgetコマンドを使用できます。

1php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d

Horizonを使用している場合は、queue:forgetコマンドの代わりにhorizon:forgetコマンドを使用して失敗したジョブを削除する必要があります。

failed_jobsテーブルからすべての失敗したジョブを削除するには、queue:flushコマンドを使用できます。

1php artisan queue:flush

存在しないモデルの無視

Eloquentモデルをジョブに注入すると、モデルはキューに配置される前に自動的にシリアライズされ、ジョブが処理されるときにデータベースから再取得されます。しかし、ジョブがワーカによる処理を待っている間にモデルが削除された場合、ジョブはModelNotFoundExceptionで失敗する可能性があります。

便宜上、ジョブのdeleteWhenMissingModelsプロパティをtrueに設定することで、存在しないモデルを持つジョブを自動的に削除することを選択できます。このプロパティがtrueに設定されている場合、Laravelは例外を発生させることなくジョブを静かに破棄します。

1/**
2 * Delete the job if its models no longer exist.
3 *
4 * @var bool
5 */
6public $deleteWhenMissingModels = true;

失敗したジョブの整理

アプリケーションのfailed_jobsテーブルのレコードを整理するには、queue:prune-failed Artisanコマンドを呼び出します。

1php artisan queue:prune-failed

デフォルトでは、24時間以上経過したすべての失敗したジョブレコードが整理されます。コマンドに--hoursオプションを指定すると、過去N時間以内に挿入された失敗したジョブレコードのみが保持されます。たとえば、次のコマンドは、48時間以上前に挿入されたすべての失敗したジョブレコードを削除します。

1php artisan queue:prune-failed --hours=48

DynamoDBへの失敗したジョブの保存

Laravelは、失敗したジョブレコードをリレーショナルデータベーステーブルの代わりにDynamoDBに保存するサポートも提供しています。ただし、失敗したすべてのジョブレコードを保存するためのDynamoDBテーブルを手動で作成する必要があります。通常、このテーブルはfailed_jobsという名前であるべきですが、アプリケーションのqueue設定ファイル内のqueue.failed.table設定値に基づいてテーブルに名前を付ける必要があります。

failed_jobsテーブルには、applicationという名前の文字列プライマリパーティションキーと、uuidという名前の文字列プライマリソートキーが必要です。キーのapplication部分には、アプリケーションのapp設定ファイル内のname設定値で定義されたアプリケーション名が含まれます。アプリケーション名はDynamoDBテーブルのキーの一部であるため、同じテーブルを使用して複数のLaravelアプリケーションの失敗したジョブを保存できます。

さらに、LaravelアプリケーションがAmazon DynamoDBと通信できるように、AWS SDKをインストールしていることを確認してください。

1composer require aws/aws-sdk-php

次に、queue.failed.driver設定オプションの値をdynamodbに設定します。さらに、失敗したジョブの設定配列内にkeysecretregion設定オプションを定義する必要があります。これらのオプションはAWSでの認証に使用されます。dynamodbドライバを使用する場合、queue.failed.database設定オプションは不要です。

1'failed' => [
2 'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
3 'key' => env('AWS_ACCESS_KEY_ID'),
4 'secret' => env('AWS_SECRET_ACCESS_KEY'),
5 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
6 'table' => 'failed_jobs',
7],

失敗したジョブの保存無効化

queue.failed.driver設定オプションの値をnullに設定することで、Laravelに失敗したジョブを保存せずに破棄するように指示できます。通常、これはQUEUE_FAILED_DRIVER環境変数を介して実現できます。

1QUEUE_FAILED_DRIVER=null

失敗したジョブのイベント

ジョブが失敗したときに呼び出されるイベントリスナを登録したい場合は、Queueファサードのfailingメソッドを使用できます。たとえば、Laravelに含まれているAppServiceProviderbootメソッドからこのイベントにクロージャをアタッチできます。

1<?php
2 
3namespace App\Providers;
4 
5use Illuminate\Support\Facades\Queue;
6use Illuminate\Support\ServiceProvider;
7use Illuminate\Queue\Events\JobFailed;
8 
9class AppServiceProvider extends ServiceProvider
10{
11 /**
12 * Register any application services.
13 */
14 public function register(): void
15 {
16 // ...
17 }
18 
19 /**
20 * Bootstrap any application services.
21 */
22 public function boot(): void
23 {
24 Queue::failing(function (JobFailed $event) {
25 // $event->connectionName
26 // $event->job
27 // $event->exception
28 });
29 }
30}

キューからのジョブクリア

Horizonを使用している場合は、queue:clearコマンドの代わりにhorizon:clearコマンドを使用してキューからジョブをクリアする必要があります。

デフォルト接続のデフォルトキューからすべてのジョブを削除したい場合は、queue:clear Artisanコマンドを使用して行うことができます。

1php artisan queue:clear

特定の接続とキューからジョブを削除するために、connection引数とqueueオプションを指定することもできます。

1php artisan queue:clear redis --queue=emails

キューからジョブをクリアできるのは、SQS、Redis、およびデータベースキューのドライバのみです。さらに、SQSのメッセージ削除プロセスには最大60秒かかるため、キューをクリアしてから最大60秒間にSQSキューへ送信されたジョブも削除される可能性があります。

キューの監視

キューが突然大量のジョブを受け取ると、処理が追いつかなくなり、ジョブの完了までに長い待ち時間が発生する可能性があります。必要であれば、キューのジョブ数が指定したしきい値を超えたときに、Laravelから警告を受け取れます。

まず、queue:monitorコマンドを毎分実行するようにスケジュールする必要があります。このコマンドは、監視したいキューの名前と、希望するジョブ数のしきい値を引数に取ります。

1php artisan queue:monitor redis:default,redis:deployments --max=100

このコマンドをスケジュールするだけでは、キューが過負荷状態であることを警告する通知をトリガーするには不十分です。コマンドがしきい値を超えるジョブ数を持つキューを検出すると、Illuminate\Queue\Events\QueueBusyイベントが発行されます。アプリケーションのAppServiceProvider内でこのイベントをリッスンし、あなたや開発チームに通知を送信できます。

1use App\Notifications\QueueHasLongWaitTime;
2use Illuminate\Queue\Events\QueueBusy;
3use Illuminate\Support\Facades\Event;
4use Illuminate\Support\Facades\Notification;
5 
6/**
7 * Bootstrap any application services.
8 */
9public function boot(): void
10{
11 Event::listen(function (QueueBusy $event) {
12 Notification::route('mail', '[email protected]')
13 ->notify(new QueueHasLongWaitTime(
14 $event->connection,
15 $event->queue,
16 $event->size
17 ));
18 });
19}

テスト

ジョブをディスパッチするコードをテストする際、ジョブ自体のコードはそれをディスパッチするコードとは別に直接テストできるため、Laravelにジョブ自体を実際には実行しないように指示したい場合があるでしょう。もちろん、ジョブ自体をテストするには、ジョブインスタンスを生成し、テスト内で直接handleメソッドを呼び出せます。

Queueファサードのfakeメソッドを使用すると、キュー投入されたジョブが実際にキューへプッシュされるのを防げます。Queueファサードのfakeメソッドを呼び出した後、アプリケーションがキューへジョブをプッシュしようとしたことをアサートできます。

1<?php
2 
3use App\Jobs\AnotherJob;
4use App\Jobs\FinalJob;
5use App\Jobs\ShipOrder;
6use Illuminate\Support\Facades\Queue;
7 
8test('orders can be shipped', function () {
9 Queue::fake();
10 
11 // Perform order shipping...
12 
13 // Assert that no jobs were pushed...
14 Queue::assertNothingPushed();
15 
16 // Assert a job was pushed to a given queue...
17 Queue::assertPushedOn('queue-name', ShipOrder::class);
18 
19 // Assert a job was pushed twice...
20 Queue::assertPushed(ShipOrder::class, 2);
21 
22 // Assert a job was not pushed...
23 Queue::assertNotPushed(AnotherJob::class);
24 
25 // Assert that a Closure was pushed to the queue...
26 Queue::assertClosurePushed();
27 
28 // Assert the total number of jobs that were pushed...
29 Queue::assertCount(3);
30});
1<?php
2 
3namespace Tests\Feature;
4 
5use App\Jobs\AnotherJob;
6use App\Jobs\FinalJob;
7use App\Jobs\ShipOrder;
8use Illuminate\Support\Facades\Queue;
9use Tests\TestCase;
10 
11class ExampleTest extends TestCase
12{
13 public function test_orders_can_be_shipped(): void
14 {
15 Queue::fake();
16 
17 // Perform order shipping...
18 
19 // Assert that no jobs were pushed...
20 Queue::assertNothingPushed();
21 
22 // Assert a job was pushed to a given queue...
23 Queue::assertPushedOn('queue-name', ShipOrder::class);
24 
25 // Assert a job was pushed twice...
26 Queue::assertPushed(ShipOrder::class, 2);
27 
28 // Assert a job was not pushed...
29 Queue::assertNotPushed(AnotherJob::class);
30 
31 // Assert that a Closure was pushed to the queue...
32 Queue::assertClosurePushed();
33 
34 // Assert the total number of jobs that were pushed...
35 Queue::assertCount(3);
36 }
37}

assertPushedまたはassertNotPushedメソッドにクロージャを渡すことで、特定の「真偽テスト」に合格するジョブがプッシュされたことをアサートできます。指定された真偽テストに合格するジョブが少なくとも1つプッシュされていれば、アサーションは成功します。

1Queue::assertPushed(function (ShipOrder $job) use ($order) {
2 return $job->order->id === $order->id;
3});

ジョブの一部のフェイク

他のジョブは通常通り実行させつつ、特定のジョブだけをフェイクにしたい場合は、フェイクにすべきジョブのクラス名をfakeメソッドに渡してください。

1test('orders can be shipped', function () {
2 Queue::fake([
3 ShipOrder::class,
4 ]);
5 
6 // Perform order shipping...
7 
8 // Assert a job was pushed twice...
9 Queue::assertPushed(ShipOrder::class, 2);
10});
1public function test_orders_can_be_shipped(): void
2{
3 Queue::fake([
4 ShipOrder::class,
5 ]);
6 
7 // Perform order shipping...
8 
9 // Assert a job was pushed twice...
10 Queue::assertPushed(ShipOrder::class, 2);
11}

exceptメソッドを使用すると、指定した一連のジョブを除くすべてのジョブをフェイクにできます。

1Queue::fake()->except([
2 ShipOrder::class,
3]);

ジョブチェーンのテスト

ジョブチェーンをテストするには、Busファサードのフェイク機能を利用する必要があります。BusファサードのassertChainedメソッドは、ジョブのチェーンがディスパッチされたことをアサートするために使用します。assertChainedメソッドは、最初の引数としてチェーンされたジョブの配列を受け取ります。

1use App\Jobs\RecordShipment;
2use App\Jobs\ShipOrder;
3use App\Jobs\UpdateInventory;
4use Illuminate\Support\Facades\Bus;
5 
6Bus::fake();
7 
8// ...
9 
10Bus::assertChained([
11 ShipOrder::class,
12 RecordShipment::class,
13 UpdateInventory::class
14]);

上の例でわかるように、チェーンされたジョブの配列は、ジョブのクラス名の配列で指定できます。ただし、実際のジョブインスタンスの配列を渡すことも可能です。その場合、Laravelはジョブインスタンスが同じクラスであり、アプリケーションによってディスパッチされたチェーンジョブと同じプロパティ値を持っていることを保証します。

1Bus::assertChained([
2 new ShipOrder,
3 new RecordShipment,
4 new UpdateInventory,
5]);

assertDispatchedWithoutChainメソッドを使用すると、ジョブがジョブチェーンなしでプッシュされたことをアサートできます。

1Bus::assertDispatchedWithoutChain(ShipOrder::class);

チェーンの変更をテストする

チェーンされたジョブが既存のチェーンにジョブを先頭または末尾に追加する場合、そのジョブのassertHasChainメソッドを使用して、ジョブに残っているチェーンが期待通りであることをアサートできます。

1$job = new ProcessPodcast;
2 
3$job->handle();
4 
5$job->assertHasChain([
6 new TranscribePodcast,
7 new OptimizePodcast,
8 new ReleasePodcast,
9]);

assertDoesntHaveChainメソッドは、ジョブの残りのチェーンが空であることをアサートするために使用します。

1$job->assertDoesntHaveChain();

チェーンされたバッチをテストする

ジョブチェーンにジョブのバッチが含まれている場合、チェーンのアサーション内にBus::chainedBatchの定義を挿入することで、チェーンされたバッチが期待通りであることをアサートできます。

1use App\Jobs\ShipOrder;
2use App\Jobs\UpdateInventory;
3use Illuminate\Bus\PendingBatch;
4use Illuminate\Support\Facades\Bus;
5 
6Bus::assertChained([
7 new ShipOrder,
8 Bus::chainedBatch(function (PendingBatch $batch) {
9 return $batch->jobs->count() === 3;
10 }),
11 new UpdateInventory,
12]);

ジョブバッチのテスト

BusファサードのassertBatchedメソッドは、ジョブのバッチがディスパッチされたことをアサートするために使用します。assertBatchedメソッドに渡されたクロージャは、Illuminate\Bus\PendingBatchのインスタンスを受け取り、これを使用してバッチ内のジョブを検査できます。

1use Illuminate\Bus\PendingBatch;
2use Illuminate\Support\Facades\Bus;
3 
4Bus::fake();
5 
6// ...
7 
8Bus::assertBatched(function (PendingBatch $batch) {
9 return $batch->name == 'import-csv' &&
10 $batch->jobs->count() === 10;
11});

assertBatchCountメソッドを使用して、指定した数のバッチがディスパッチされたことをアサートできます。

1Bus::assertBatchCount(3);

assertNothingBatchedを使用して、バッチがディスパッチされなかったことをアサートできます。

1Bus::assertNothingBatched();

ジョブとバッチの相互作用をテストする

さらに、個々のジョブとその基盤となるバッチとの相互作用をテストする必要がある場合もあります。たとえば、あるジョブがバッチのそれ以上の処理をキャンセルしたかどうかをテストする必要があるかもしれません。これを実現するには、withFakeBatchメソッドを介してジョブにフェイクのバッチを割り当てる必要があります。withFakeBatchメソッドは、ジョブインスタンスとフェイクのバッチを含むタプルを返します。

1[$job, $batch] = (new ShipOrder)->withFakeBatch();
2 
3$job->handle();
4 
5$this->assertTrue($batch->cancelled());
6$this->assertEmpty($batch->added);

ジョブ/キューインタラクションのテスト

キュー投入されたジョブが自身をキューに解放し直すことをテストしたり、ジョブが自身を削除したことをテストしたりする必要がある場合があります。これらのキューとの相互作用は、ジョブをインスタンス化し、withFakeQueueInteractionsメソッドを呼び出すことでテストできます。

ジョブのキューとの相互作用がフェイクにされたら、そのジョブのhandleメソッドを呼び出せます。ジョブを呼び出した後、assertReleasedassertDeletedassertNotDeletedassertFailedassertFailedWithassertNotFailedメソッドを使用して、ジョブのキューとの相互作用に対するアサーションを行えます。

1use App\Exceptions\CorruptedAudioException;
2use App\Jobs\ProcessPodcast;
3 
4$job = (new ProcessPodcast)->withFakeQueueInteractions();
5 
6$job->handle();
7 
8$job->assertReleased(delay: 30);
9$job->assertDeleted();
10$job->assertNotDeleted();
11$job->assertFailed();
12$job->assertFailedWith(CorruptedAudioException::class);
13$job->assertNotFailed();

ジョブイベント

Queueファサードbeforeおよびafterメソッドを使用して、キュー投入されたジョブが処理される前後に実行されるコールバックを指定できます。これらのコールバックは、追加のログ記録を実行したり、ダッシュボードの統計をインクリメントしたりする絶好の機会です。通常、これらのメソッドはサービスプロバイダbootメソッドから呼び出します。たとえば、Laravelに含まれているAppServiceProviderを使用できます。

1<?php
2 
3namespace App\Providers;
4 
5use Illuminate\Support\Facades\Queue;
6use Illuminate\Support\ServiceProvider;
7use Illuminate\Queue\Events\JobProcessed;
8use Illuminate\Queue\Events\JobProcessing;
9 
10class AppServiceProvider extends ServiceProvider
11{
12 /**
13 * Register any application services.
14 */
15 public function register(): void
16 {
17 // ...
18 }
19 
20 /**
21 * Bootstrap any application services.
22 */
23 public function boot(): void
24 {
25 Queue::before(function (JobProcessing $event) {
26 // $event->connectionName
27 // $event->job
28 // $event->job->payload()
29 });
30 
31 Queue::after(function (JobProcessed $event) {
32 // $event->connectionName
33 // $event->job
34 // $event->job->payload()
35 });
36 }
37}

Queueファサードloopingメソッドを使用すると、ワーカがキューからジョブを取得しようとする前に実行するコールバックを指定できます。たとえば、以前に失敗したジョブによって開かれたままになっているトランザクションをロールバックするクロージャを登録できます。

1use Illuminate\Support\Facades\DB;
2use Illuminate\Support\Facades\Queue;
3 
4Queue::looping(function () {
5 while (DB::transactionLevel() > 0) {
6 DB::rollBack();
7 }
8});

Laravelは最も生産的な方法です
ソフトウェアを構築、デプロイ、監視します。