コンテンツへスキップ

キュー

はじめに

ウェブアプリケーションの構築中、アップロードされたCSVファイルの解析と保存など、通常のウェブリクエスト中に実行するには時間がかかりすぎるタスクが発生することがあります。幸いなことに、Laravelでは、バックグラウンドで処理できるキュージョブを簡単に作成できます。時間のかかるタスクをキューに移動することで、アプリケーションはウェブリクエストに高速で応答し、顧客により良いユーザーエクスペリエンスを提供できます。

Laravelキューは、Amazon SQSRedis、さらにはリレーショナルデータベースなど、さまざまなキューバックエンドにわたって統一されたキューイングAPIを提供します。

Laravelのキュー設定オプションは、アプリケーションのconfig/queue.php設定ファイルに格納されています。このファイルには、データベース、Amazon SQSRedisBeanstalkdドライバを含む、フレームワークに含まれる各キュードライバの接続設定と、ジョブをすぐに実行する同期ドライバ(ローカル開発時に使用)が含まれています。キューイングされたジョブを破棄するnullキュードライバも含まれています。

lightbulb

Laravelは、Redis対応キューのための美しいダッシュボードと設定システムであるHorizonを提供しています。詳細については、Horizonドキュメント全文をご覧ください。

接続とキュー

Laravelキューを使い始める前に、「接続」と「キュー」の違いを理解することが重要です。config/queue.php設定ファイルには、connections設定配列があります。このオプションは、Amazon SQS、Beanstalk、Redisなどのバックエンドキューサービスへの接続を定義します。ただし、特定のキュー接続には、キューイングされたジョブの異なるスタックまたはパイルと考えることができる複数の「キュー」がある場合があります。

queue設定ファイルの各接続設定例には、queue属性が含まれていることに注意してください。これは、ジョブが特定の接続に送信されたときにディスパッチされるデフォルトのキューです。つまり、ジョブをディスパッチするキューを明示的に定義せずにジョブをディスパッチする場合、ジョブは接続のqueue属性で定義されているキューに配置されます。

use App\Jobs\ProcessPodcast;
 
// This job is sent to the default connection's default queue...
ProcessPodcast::dispatch();
 
// This job is sent to the default connection's "emails" queue...
ProcessPodcast::dispatch()->onQueue('emails');

一部のアプリケーションでは、複数のキューにジョブをプッシュする必要がなく、単一の単純なキューを使用することを好む場合があります。ただし、複数のキューにジョブをプッシュすることは、Laravelキューワーカーは優先順位によって処理するキューを指定できるため、ジョブの処理方法を優先順位付けまたはセグメント化したいアプリケーションにとって特に便利です。たとえば、highキューにジョブをプッシュする場合、より高い処理優先順位を与えるワーカーを実行できます。

php artisan queue:work --queue=high,default

ドライバに関する注意事項と前提条件

データベース

databaseキュードライバを使用するには、ジョブを保持するデータベーステーブルが必要です。通常、これはLaravelのデフォルトの0001_01_01_000002_create_jobs_table.phpデータベースマイグレーションに含まれています。ただし、アプリケーションにこのマイグレーションが含まれていない場合は、make:queue-table Artisanコマンドを使用して作成できます。

php artisan make:queue-table
 
php artisan migrate

Redis

redisキュードライバを使用するには、config/database.php設定ファイルでRedisデータベース接続を設定する必要があります。

exclamation

serializercompressionのRedisオプションは、redisキュードライバではサポートされていません。

Redisクラスタ

Redisキュー接続でRedisクラスタを使用する場合は、キュー名にキーハッシュタグを含める必要があります。これは、特定のキューのすべてのRedisキーが同じハッシュスロットに配置されるようにするために必要です。

'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', '{default}'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => null,
'after_commit' => false,
],

ブロッキング

Redisキューを使用する場合、block_for設定オプションを使用して、ジョブが利用可能になるまでドライバが待機する時間を指定し、ワーカーループを反復処理してRedisデータベースを再ポーリングする前に指定できます。

キューの負荷に応じてこの値を調整すると、Redisデータベースを継続的にポーリングするよりも効率的になります。たとえば、値を5に設定して、ドライバがジョブが利用可能になるまで5秒間ブロックすることを示すことができます。

'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => 5,
'after_commit' => false,
],
exclamation

block_for0に設定すると、キューワーカーはジョブが利用可能になるまで無期限にブロックされます。これにより、SIGTERMなどのシグナルは、次のジョブが処理されるまで処理されなくなります。

その他のドライバの前提条件

リストされたキュードライバには、次の依存関係が必要です。これらの依存関係は、Composerパッケージマネージャーを使用してインストールできます。

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~5.0
  • Redis: predis/predis ~2.0またはphpredis PHP拡張機能
  • MongoDB: mongodb/laravel-mongodb

ジョブの作成

ジョブクラスの生成

デフォルトでは、アプリケーションのすべてのキュー可能なジョブはapp/Jobsディレクトリに格納されます。app/Jobsディレクトリが存在しない場合、make:job Artisanコマンドを実行すると作成されます。

php artisan make:job ProcessPodcast

生成されたクラスはIlluminate\Contracts\Queue\ShouldQueueインターフェースを実装し、Laravelにジョブを非同期で実行するためにキューにプッシュする必要があることを示します。

lightbulb

ジョブスタブは、スタブの公開を使用してカスタマイズできます。

クラス構造

ジョブクラスは非常にシンプルで、通常はキューによってジョブが処理されると呼び出されるhandleメソッドのみを含んでいます。はじめに、ジョブクラスの例を見てみましょう。この例では、ポッドキャスト公開サービスを管理しており、公開前にアップロードされたポッドキャストファイルを処理する必要があると仮定します。

<?php
 
namespace App\Jobs;
 
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
 
class ProcessPodcast implements ShouldQueue
{
use Queueable;
 
/**
* Create a new job instance.
*/
public function __construct(
public Podcast $podcast,
) {}
 
/**
* Execute the job.
*/
public function handle(AudioProcessor $processor): void
{
// Process uploaded podcast...
}
}

この例では、Eloquentモデルをキューに入れられたジョブのコンストラクタに直接渡すことができたことに注意してください。ジョブで使用されているQueueableトレイトのため、Eloquentモデルとそのロードされたリレーションシップは、ジョブの処理時に適切にシリアライズおよびアンシリアライズされます。

キューに入れられたジョブがコンストラクタでEloquentモデルを受け入れる場合、モデルの識別子のみがキューにシリアライズされます。ジョブが実際に処理されると、キューシステムはデータベースから完全なモデルインスタンスとそのロードされたリレーションシップを自動的に再取得します。このモデルシリアライズのアプローチにより、キュードライバに送信されるジョブペイロードをはるかに小さくすることができます。

handleメソッドの依存性注入

handleメソッドは、キューによってジョブが処理されると呼び出されます。ジョブのhandleメソッドで依存関係の型ヒントを使用できることに注意してください。Laravelのサービスコンテナは、これらの依存関係を自動的に注入します。

コンテナがhandleメソッドに依存関係を注入する方法を完全に制御したい場合は、コンテナのbindMethodメソッドを使用できます。bindMethodメソッドは、ジョブとコンテナを受け取るコールバックを受け取ります。コールバック内では、自由にhandleメソッドを呼び出すことができます。通常、これはApp\Providers\AppServiceProviderサービスプロバイダbootメソッドからこのメソッドを呼び出す必要があります。

use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;
 
$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
return $job->handle($app->make(AudioProcessor::class));
});
exclamation

生の画像コンテンツなどのバイナリデータは、キューに入れられたジョブに渡す前にbase64_encode関数を通して渡す必要があります。そうしないと、ジョブはキューに配置されるときにJSONに正しくシリアライズされない可能性があります。

キューに入れられたリレーションシップ

ロードされたすべてのEloquentモデルリレーションシップもジョブのキューイング時にシリアライズされるため、シリアライズされたジョブ文字列は非常に大きくなる場合があります。さらに、ジョブがデシリアライズされ、モデルリレーションシップがデータベースから再取得されると、それらは完全に取得されます。ジョブのキューイングプロセス中にモデルがシリアライズされる前に適用された以前のリレーションシップ制約は、ジョブがデシリアライズされるときに適用されません。したがって、特定のリレーションシップのサブセットを操作する場合は、キューに入れられたジョブ内でそのリレーションシップを再制約する必要があります。

または、リレーションのシリアライズを防ぐには、プロパティ値を設定するときにモデルでwithoutRelationsメソッドを呼び出すことができます。このメソッドは、ロードされたリレーションシップのないモデルのインスタンスを返します。

/**
* Create a new job instance.
*/
public function __construct(
Podcast $podcast,
) {
$this->podcast = $podcast->withoutRelations();
}

PHPのコンストラクタプロパティプロモーションを使用していて、Eloquentモデルのリレーションをシリアライズしないことを示したい場合は、WithoutRelations属性を使用できます。

use Illuminate\Queue\Attributes\WithoutRelations;
 
/**
* Create a new job instance.
*/
public function __construct(
#[WithoutRelations]
public Podcast $podcast,
) {}

ジョブが単一のモデルではなく、Eloquentモデルのコレクションまたは配列を受け取る場合、そのコレクション内のモデルのリレーションシップは、ジョブがデシリアライズされて実行されるときに復元されません。これは、多数のモデルを処理するジョブでの過剰なリソース使用を回避するためです。

一意のジョブ

exclamation

一意のジョブには、ロックをサポートするキャッシュドライバが必要です。現在、memcachedredisdynamodbdatabasefilearrayキャッシュドライバはアトミックロックをサポートしています。さらに、一意のジョブ制約は、バッチ内のジョブには適用されません。

特定のジョブのインスタンスがいつでも1つだけキューにあるようにしたい場合があります。これを行うには、ジョブクラスにShouldBeUniqueインターフェースを実装します。このインターフェースでは、クラスに追加のメソッドを定義する必要はありません。

<?php
 
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
 
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
...
}

上記の例では、UpdateSearchIndexジョブは一意です。したがって、ジョブの別のインスタンスが既にキューにあり、処理が完了していない場合、ジョブはディスパッチされません。

特定のケースでは、ジョブを一意にする特定の「キー」を定義したり、ジョブが一意ではなくなるタイムアウトを指定したりする場合があります。これを実現するには、ジョブクラスにuniqueIduniqueForのプロパティまたはメソッドを定義できます。

<?php
 
use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
 
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
/**
* The product instance.
*
* @var \App\Product
*/
public $product;
 
/**
* The number of seconds after which the job's unique lock will be released.
*
* @var int
*/
public $uniqueFor = 3600;
 
/**
* Get the unique ID for the job.
*/
public function uniqueId(): string
{
return $this->product->id;
}
}

上記の例では、UpdateSearchIndexジョブは製品IDによって一意です。したがって、同じ製品IDを持つジョブの新しいディスパッチは、既存のジョブの処理が完了するまで無視されます。さらに、既存のジョブが1時間以内に処理されない場合、一意のロックは解放され、同じ一意のキーを持つ別のジョブをキューにディスパッチできます。

exclamation

アプリケーションが複数のWebサーバーまたはコンテナからジョブをディスパッチする場合は、すべてのサーバーが同じ中央キャッシュサーバーと通信するようにして、Laravelがジョブが一意かどうかを正確に判断できるようにする必要があります。

処理開始までジョブを一意に保つ

デフォルトでは、一意のジョブは、ジョブの処理が完了するか、すべての再試行が失敗した後に「ロック解除」されます。ただし、ジョブが処理される直前にすぐにロック解除したい状況がある場合があります。これを実現するには、ShouldBeUniqueコントラクトの代わりにShouldBeUniqueUntilProcessingコントラクトを実装する必要があります。

<?php
 
use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
 
class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
// ...
}

一意のジョブロック

内部的には、ShouldBeUniqueジョブがディスパッチされると、LaravelはuniqueIdキーでロックを取得しようとします。ロックが取得されない場合、ジョブはディスパッチされません。このロックは、ジョブの処理が完了するか、すべての再試行が失敗したときに解放されます。デフォルトでは、Laravelはこのロックを取得するためにデフォルトのキャッシュドライバを使用します。ただし、ロックの取得に別のドライバを使用する場合は、使用するキャッシュドライバを返すuniqueViaメソッドを定義できます。

use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;
 
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
...
 
/**
* Get the cache driver for the unique job lock.
*/
public function uniqueVia(): Repository
{
return Cache::driver('redis');
}
}
lightbulb

ジョブの同時処理を制限する必要があるだけの場合、WithoutOverlappingジョブミドルウェアを使用してください。

暗号化されたジョブ

Laravelでは、暗号化を使用して、ジョブデータのプライバシーと整合性を確保できます。開始するには、ShouldBeEncryptedインターフェースをジョブクラスに追加するだけです。このインターフェースがクラスに追加されると、Laravelはキューにプッシュする前にジョブを自動的に暗号化します。

<?php
 
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;
 
class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
{
// ...
}

ジョブミドルウェア

ジョブミドルウェアを使用すると、キューに入れられたジョブの実行をカスタムロジックでラップし、ジョブ自体の定型的なコードを削減できます。たとえば、LaravelのRedisレート制限機能を利用して、5秒ごとに1つのジョブだけが処理されるようにする次のhandleメソッドを考えてみましょう。

use Illuminate\Support\Facades\Redis;
 
/**
* Execute the job.
*/
public function handle(): void
{
Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
info('Lock obtained...');
 
// Handle job...
}, function () {
// Could not obtain lock...
 
return $this->release(5);
});
}

このコードは有効ですが、Redisレート制限ロジックが混在しているため、handleメソッドの実装が煩雑になります。さらに、このレート制限ロジックは、レート制限したい他のすべてのジョブに対して複製する必要があります。

handleメソッドでレート制限する代わりに、レート制限を処理するジョブミドルウェアを定義できます。Laravelにはジョブミドルウェアのデフォルトの場所がないため、アプリケーションのどこにでもジョブミドルウェアを配置できます。この例では、app/Jobs/Middlewareディレクトリにミドルウェアを配置します。

<?php
 
namespace App\Jobs\Middleware;
 
use Closure;
use Illuminate\Support\Facades\Redis;
 
class RateLimited
{
/**
* Process the queued job.
*
* @param \Closure(object): void $next
*/
public function handle(object $job, Closure $next): void
{
Redis::throttle('key')
->block(0)->allow(1)->every(5)
->then(function () use ($job, $next) {
// Lock obtained...
 
$next($job);
}, function () use ($job) {
// Could not obtain lock...
 
$job->release(5);
});
}
}

ルートミドルウェアと同様に、ジョブミドルウェアは処理されているジョブと、ジョブの処理を続行するために呼び出す必要があるコールバックを受け取ります。

ジョブミドルウェアを作成した後、ジョブのmiddlewareメソッドからそれらを返すことで、ジョブに添付できます。このメソッドは、make:job Artisanコマンドによって作成されたジョブには存在しないため、ジョブクラスに手動で追加する必要があります。

use App\Jobs\Middleware\RateLimited;
 
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited];
}
lightbulb

ジョブミドルウェアは、キューに入れられるイベントリスナー、メール、通知にも割り当てることができます。

レート制限

独自のレート制限ジョブミドルウェアの書き方を示しましたが、Laravelには実際にはジョブをレート制限するために使用できるレート制限ミドルウェアが含まれています。ルートレートリミッターと同様に、ジョブレートリミッターはRateLimiterファサードのforメソッドを使用して定義されます。

たとえば、ユーザーが1時間につき1回データをバックアップできるようにし、プレミアムユーザーにはそのような制限を課さないようにしたい場合があります。これを実現するには、AppServiceProviderbootメソッドでRateLimiterを定義できます。

use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;
 
/**
* Bootstrap any application services.
*/
public function boot(): void
{
RateLimiter::for('backups', function (object $job) {
return $job->user->vipCustomer()
? Limit::none()
: Limit::perHour(1)->by($job->user->id);
});
}

上記の例では、毎時レート制限を定義しましたが、perMinuteメソッドを使用して分単位のレート制限を簡単に定義できます。また、レート制限のbyメソッドに任意の値を渡すことができますが、この値は多くの場合、顧客ごとにレート制限をセグメント化するために使用されます。

return Limit::perMinute(50)->by($job->user->id);

レート制限を定義したら、Illuminate\Queue\Middleware\RateLimitedミドルウェアを使用してレートリミッターをジョブに添付できます。ジョブがレート制限を超えるたびに、このミドルウェアは、レート制限期間に基づいて適切な遅延でジョブをキューに戻します。

use Illuminate\Queue\Middleware\RateLimited;
 
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited('backups')];
}

レート制限されたジョブをキューに戻しても、ジョブのattemptsの総数は増加します。ジョブクラスのtriesmaxExceptionsのプロパティを適切に調整することをお勧めします。または、retryUntilメソッドを使用して、ジョブを再試行しない時間の長さを定義することもできます。

レート制限されたジョブを再試行しないようにするには、dontReleaseメソッドを使用します。

/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new RateLimited('backups'))->dontRelease()];
}
lightbulb

Redisを使用している場合は、Illuminate\Queue\Middleware\RateLimitedWithRedisミドルウェアを使用できます。これはRedis用に微調整されており、基本的なレート制限ミドルウェアよりも効率的です。

ジョブの重複の防止

Laravelには、任意のキーに基づいてジョブの重複を防ぐことができるIlluminate\Queue\Middleware\WithoutOverlappingミドルウェアが含まれています。これは、キューに入れられたジョブが1つのジョブだけで変更される必要があるリソースを変更する場合に役立ちます。

たとえば、ユーザーの信用スコアを更新するキューに入れられたジョブがあり、同じユーザーIDに対して信用スコア更新ジョブの重複を防ぎたいとします。これを実現するには、ジョブのmiddlewareメソッドからWithoutOverlappingミドルウェアを返すことができます。

use Illuminate\Queue\Middleware\WithoutOverlapping;
 
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new WithoutOverlapping($this->user->id)];
}

同じタイプの重複するジョブは、キューに戻されます。リリースされたジョブが再び試行されるまでに経過する秒数も指定できます。

/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}

重複するジョブをすぐに削除して再試行されないようにするには、dontReleaseメソッドを使用します。

/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}

WithoutOverlappingミドルウェアは、Laravelのアトミックロック機能によって実現されています。場合によっては、ジョブが予期せず失敗したりタイムアウトしたりして、ロックが解放されないことがあります。したがって、expireAfterメソッドを使用して、ロックの有効期限を明示的に定義できます。たとえば、以下の例では、Laravelにジョブの処理開始後3分後にWithoutOverlappingロックを解放するよう指示します。

/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}
exclamation

WithoutOverlappingミドルウェアには、ロックをサポートするキャッシュドライバが必要です。現在、memcachedredisdynamodbdatabasefilearrayキャッシュドライバはアトミックロックをサポートしています。

ジョブクラス間でロックキーを共有する

デフォルトでは、WithoutOverlappingミドルウェアは同じクラスの重複するジョブのみを防ぎます。したがって、2つの異なるジョブクラスが同じロックキーを使用する場合でも、重複は防止されません。ただし、sharedメソッドを使用して、Laravelにキーをジョブクラス全体に適用するように指示できます。

use Illuminate\Queue\Middleware\WithoutOverlapping;
 
class ProviderIsDown
{
// ...
 
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}
 
class ProviderIsUp
{
// ...
 
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}

例外のスロットリング

Laravel には、例外のスロットリングを可能にする `Illuminate\Queue\Middleware\ThrottlesExceptions` ミドルウェアが含まれています。ジョブが指定された回数例外をスローすると、指定された時間間隔が経過するまで、ジョブを実行しようとするそれ以降の試みはすべて遅延されます。このミドルウェアは、不安定なサードパーティサービスとやり取りするジョブに特に役立ちます。

たとえば、例外をスローし始めたサードパーティAPIとやり取りするキューに入れられたジョブがあるとします。例外のスロットリングを行うには、ジョブの `middleware` メソッドから `ThrottlesExceptions` ミドルウェアを返すことができます。通常、このミドルウェアは、時間ベースの試行を実装するジョブと組み合わせて使用する必要があります。

use DateTime;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
 
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new ThrottlesExceptions(10, 5 * 60)];
}
 
/**
* Determine the time at which the job should timeout.
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(30);
}

ミドルウェアが受け入れる最初のコンストラクタ引数は、ジョブがスロットリングされる前にスローできる例外の数であり、2番目のコンストラクタ引数は、ジョブがスロットリングされた後、ジョブが再度試行されるまでに経過する秒数です。上記のコード例では、ジョブが連続して10個の例外をスローした場合、30分の時間制限によって制限されながら、ジョブを再度試行する前に5分間待機します。

ジョブが例外をスローしても、例外の閾値にまだ達していない場合は、通常、ジョブはすぐに再試行されます。ただし、ミドルウェアをジョブにアタッチする際に `backoff` メソッドを呼び出すことで、そのようなジョブを遅延させる分数を指定できます。

use Illuminate\Queue\Middleware\ThrottlesExceptions;
 
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];
}

内部的には、このミドルウェアはLaravelのキャッシュシステムを使用してレート制限を実装し、ジョブのクラス名がキャッシュの「キー」として使用されます。ジョブにミドルウェアをアタッチする際に `by` メソッドを呼び出すことで、このキーを上書きできます。これは、同じサードパーティサービスとやり取りする複数のジョブがあり、それらが共通のスロットリング「バケット」を共有することを望む場合に役立つ場合があります。

use Illuminate\Queue\Middleware\ThrottlesExceptions;
 
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];
}

デフォルトでは、このミドルウェアはすべての例外をスロットリングします。ジョブにミドルウェアをアタッチする際に `when` メソッドを呼び出すことで、この動作を変更できます。例外は、`when` メソッドに提供されたクロージャが `true` を返す場合にのみスロットリングされます。

use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
 
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->when(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}

スロットリングされた例外をアプリケーションの例外ハンドラに報告させたい場合は、ジョブにミドルウェアをアタッチする際に `report` メソッドを呼び出すことができます。オプションとして、`report` メソッドにクロージャを提供できます。指定されたクロージャが `true` を返す場合にのみ例外が報告されます。

use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
 
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->report(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}
lightbulb

Redisを使用している場合は、Redis用に微調整されており、基本的な例外スロットリングミドルウェアよりも効率的な `Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis` ミドルウェアを使用できます。

ジョブのスキップ

`Skip` ミドルウェアを使用すると、ジョブのロジックを変更する必要なく、ジョブをスキップ/削除することを指定できます。`Skip::when` メソッドは、指定された条件が `true` と評価される場合にジョブを削除し、`Skip::unless` メソッドは、条件が `false` と評価される場合にジョブを削除します。

use Illuminate\Queue\Middleware\Skip;
 
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [
Skip::when($someCondition),
];
}

より複雑な条件評価のために、`when` メソッドと `unless` メソッドに `Closure` を渡すこともできます。

use Illuminate\Queue\Middleware\Skip;
 
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [
Skip::when(function (): bool {
return $this->shouldSkip();
}),
];
}

ジョブのディスパッチ

ジョブクラスを作成したら、ジョブ自体の `dispatch` メソッドを使用してディスパッチできます。`dispatch` メソッドに渡された引数は、ジョブのコンストラクタに渡されます。

<?php
 
namespace App\Http\Controllers;
 
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
 
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
 
// ...
 
ProcessPodcast::dispatch($podcast);
 
return redirect('/podcasts');
}
}

ジョブを条件付きでディスパッチする場合は、`dispatchIf` メソッドと `dispatchUnless` メソッドを使用できます。

ProcessPodcast::dispatchIf($accountActive, $podcast);
 
ProcessPodcast::dispatchUnless($accountSuspended, $podcast);

新しいLaravelアプリケーションでは、`sync` ドライバがデフォルトのキュードライバです。このドライバは、現在のリクエストのフォアグラウンドでジョブを同期的に実行します。これは、ローカル開発中に便利なことがよくあります。バックグラウンド処理のためにジョブのキューイングを実際に開始する場合は、アプリケーションの `config/queue.php` 構成ファイル内で別のキュードライバを指定できます。

遅延ディスパッチ

ジョブがキューワーカーによってすぐに処理できるようにしたくない場合は、ジョブをディスパッチする際に `delay` メソッドを使用できます。たとえば、ジョブがディスパッチされてから10分後まで処理に使用できないように指定してみましょう。

<?php
 
namespace App\Http\Controllers;
 
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
 
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
 
// ...
 
ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(10));
 
return redirect('/podcasts');
}
}

場合によっては、ジョブにデフォルトの遅延が構成されている場合があります。この遅延をバイパスしてジョブをすぐに処理するためにディスパッチする必要がある場合は、`withoutDelay` メソッドを使用できます。

ProcessPodcast::dispatch($podcast)->withoutDelay();
exclamation

Amazon SQS キューサービスの最大遅延時間は15分です。

レスポンスがブラウザに送信された後のディスパッチ

または、`dispatchAfterResponse` メソッドを使用すると、WebサーバーがFastCGIを使用している場合、HTTPレスポンスがユーザーのブラウザに送信された後にジョブのディスパッチが遅延されます。これにより、キューに入れられたジョブがまだ実行中であっても、ユーザーはアプリケーションの使用を開始できます。これは、通常、メールの送信など、約1秒かかるジョブにのみ使用する必要があります。これらは現在のHTTPリクエスト内で処理されるため、このようにディスパッチされたジョブは、処理するためにキューワーカーが実行されている必要はありません。

use App\Jobs\SendNotification;
 
SendNotification::dispatchAfterResponse();

`dispatch` ヘルパーに `afterResponse` メソッドをチェーンして、HTTPレスポンスがブラウザに送信された後にクロージャを実行することもできます。

use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;
 
dispatch(function () {
Mail::to('[email protected]')->send(new WelcomeMessage);
})->afterResponse();

同期ディスパッチ

ジョブをすぐに(同期的に)ディスパッチする場合は、`dispatchSync` メソッドを使用できます。このメソッドを使用すると、ジョブはキューに入れられず、現在のプロセス内ですぐに実行されます。

<?php
 
namespace App\Http\Controllers;
 
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
 
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
 
// Create podcast...
 
ProcessPodcast::dispatchSync($podcast);
 
return redirect('/podcasts');
}
}

ジョブとデータベーストランザクション

データベーストランザクション内でジョブをディスパッチすることは完全に問題ありませんが、ジョブが実際に正常に実行できることを確認するために特別な注意を払う必要があります。トランザクション内でジョブをディスパッチすると、親トランザクションがコミットされる前にワーカーによってジョブが処理される可能性があります。このようなことが起こると、データベーストランザクション中にモデルまたはデータベースレコードに行った更新は、データベースにまだ反映されていない可能性があります。さらに、トランザクション内で作成されたモデルまたはデータベースレコードは、データベースに存在しない可能性があります。

幸いなことに、Laravelは、この問題を回避するためのいくつかの方法を提供しています。まず、キュー接続の構成配列で `after_commit` 接続オプションを設定できます。

'redis' => [
'driver' => 'redis',
// ...
'after_commit' => true,
],

`after_commit` オプションが `true` の場合、データベーストランザクション内でジョブをディスパッチできます。ただし、Laravelは、開いている親データベーストランザクションがコミットされるまで、ジョブのディスパッチを待ちます。もちろん、現在開いているデータベーストランザクションがない場合は、ジョブはすぐにディスパッチされます。

トランザクション中に発生した例外のためにトランザクションがロールバックされた場合、そのトランザクション中にディスパッチされたジョブは破棄されます。

lightbulb

`after_commit` 構成オプションを `true` に設定すると、キューに入れられたイベントリスナー、メール送信可能オブジェクト、通知、ブロードキャストイベントも、開いているすべてのデータベーストランザクションがコミットされた後にディスパッチされます。

コミットディスパッチ動作のインライン指定

`after_commit` キュー接続構成オプションを `true` に設定しない場合でも、開いているすべてのデータベーストランザクションがコミットされた後に特定のジョブをディスパッチする必要があることを示すことができます。これを実現するには、ディスパッチ操作に `afterCommit` メソッドをチェーンできます。

use App\Jobs\ProcessPodcast;
 
ProcessPodcast::dispatch($podcast)->afterCommit();

同様に、`after_commit` 構成オプションが `true` に設定されている場合、開いているデータベーストランザクションがコミットされるのを待たずに、特定のジョブをすぐにディスパッチすることを示すことができます。

ProcessPodcast::dispatch($podcast)->beforeCommit();

ジョブチェーン

ジョブチェーンを使用すると、主要なジョブが正常に実行された後に順番に実行する必要があるキューに入れられたジョブのリストを指定できます。シーケンス内の1つのジョブが失敗した場合、残りのジョブは実行されません。キューに入れられたジョブチェーンを実行するには、`Bus` ファサードによって提供される `chain` メソッドを使用できます。Laravelのコマンドバスは、キューに入れられたジョブのディスパッチが構築されているより低レベルのコンポーネントです。

use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;
 
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->dispatch();

ジョブクラスインスタンスをチェーンすることに加えて、クロージャをチェーンすることもできます。

Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
function () {
Podcast::update(/* ... */);
},
])->dispatch();
exclamation

ジョブ内で `$this->delete()` メソッドを使用してジョブを削除しても、チェーンされたジョブの処理は妨げられません。チェーンは、チェーン内のジョブが失敗した場合にのみ実行を停止します。

チェーン接続とキュー

チェーンされたジョブに使用する接続とキューを指定する場合は、`onConnection` メソッドと `onQueue` メソッドを使用できます。これらのメソッドは、キューに入れられたジョブに異なる接続/キューが明示的に割り当てられていない限り、使用するキュー接続とキュー名を指定します。

Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();

チェーンへのジョブの追加

場合によっては、そのチェーン内の別のジョブから既存のジョブチェーンの先頭または末尾にジョブを追加する必要がある場合があります。これは、`prependToChain` メソッドと `appendToChain` メソッドを使用して実行できます。

/**
* Execute the job.
*/
public function handle(): void
{
// ...
 
// Prepend to the current chain, run job immediately after current job...
$this->prependToChain(new TranscribePodcast);
 
// Append to the current chain, run job at end of chain...
$this->appendToChain(new TranscribePodcast);
}

チェーンの失敗

ジョブをチェーンする場合、`catch` メソッドを使用して、チェーン内のジョブが失敗した場合に呼び出すクロージャを指定できます。指定されたコールバックは、ジョブの失敗の原因となった `Throwable` インスタンスを受け取ります。

use Illuminate\Support\Facades\Bus;
use Throwable;
 
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->catch(function (Throwable $e) {
// A job within the chain has failed...
})->dispatch();
exclamation

チェーンコールバックはシリアライズされて後でLaravelキューによって実行されるため、チェーンコールバック内で `$this` 変数を使用しないでください。

キューと接続のカスタマイズ

特定のキューへのディスパッチ

ジョブを異なるキューにプッシュすることで、キューに入れられたジョブを「分類」し、さまざまなキューに割り当てるワーカーの数を優先順位付けることができます。これは、キュー構成ファイルで定義されているように、異なるキュー「接続」にジョブをプッシュするのではなく、単一の接続内の特定のキューにジョブをプッシュすることに注意してください。キューを指定するには、ジョブをディスパッチする際に `onQueue` メソッドを使用します。

<?php
 
namespace App\Http\Controllers;
 
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
 
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
 
// Create podcast...
 
ProcessPodcast::dispatch($podcast)->onQueue('processing');
 
return redirect('/podcasts');
}
}

または、ジョブのコンストラクタ内で `onQueue` メソッドを呼び出すことによって、ジョブのキューを指定できます。

<?php
 
namespace App\Jobs;
 
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
 
class ProcessPodcast implements ShouldQueue
{
use Queueable;
 
/**
* Create a new job instance.
*/
public function __construct()
{
$this->onQueue('processing');
}
}

特定の接続へのディスパッチ

アプリケーションが複数のキュー接続とやり取りする場合は、`onConnection` メソッドを使用して、ジョブをプッシュする接続を指定できます。

<?php
 
namespace App\Http\Controllers;
 
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
 
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
 
// Create podcast...
 
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
 
return redirect('/podcasts');
}
}

`onConnection` メソッドと `onQueue` メソッドを組み合わせてチェーンすることにより、ジョブの接続とキューを指定できます。

ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');

または、ジョブのコンストラクタ内で `onConnection` メソッドを呼び出すことによって、ジョブの接続を指定できます。

<?php
 
namespace App\Jobs;
 
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
 
class ProcessPodcast implements ShouldQueue
{
use Queueable;
 
/**
* Create a new job instance.
*/
public function __construct()
{
$this->onConnection('sqs');
}
}

最大ジョブ試行回数/タイムアウト値の指定

最大試行回数

キューに入れられたジョブの1つでエラーが発生した場合、無期限に再試行させたくない可能性があります。そのため、Laravelは、ジョブを試行できる回数または期間を指定するさまざまな方法を提供しています。

ジョブを試行できる最大回数を指定する1つの方法は、Artisanコマンドラインの `--tries` スイッチを使用することです。これは、処理されているジョブが試行できる回数を指定していない限り、ワーカーによって処理されるすべてのジョブに適用されます。

php artisan queue:work --tries=3

ジョブが最大試行回数を超えると、「失敗した」ジョブと見なされます。失敗したジョブの処理の詳細については、失敗したジョブに関するドキュメントを参照してください。`--tries=0` が `queue:work` コマンドに提供されると、ジョブは無期限に再試行されます。

ジョブクラス自体でジョブを試行できる最大回数を定義することで、より詳細なアプローチをとることができます。最大試行回数がジョブで指定されている場合、コマンドラインで提供された `--tries` の値よりも優先されます。

<?php
 
namespace App\Jobs;
 
class ProcessPodcast implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 5;
}

特定のジョブの最大試行回数を動的に制御する必要がある場合は、ジョブに `tries` メソッドを定義できます。

/**
* Determine number of times the job may be attempted.
*/
public function tries(): int
{
return 5;
}

時間ベースの試行

ジョブが失敗する前に試行できる回数を定義する代わりに、ジョブを試行すべきではない時間を定義できます。これにより、ジョブは指定された時間枠内で何度でも試行できます。ジョブを試行すべきではない時間を定義するには、`retryUntil` メソッドをジョブクラスに追加します。このメソッドは `DateTime` インスタンスを返す必要があります。

use DateTime;
 
/**
* Determine the time at which the job should timeout.
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(10);
}
lightbulb

キューに入れられたイベントリスナーに `tries` プロパティまたは `retryUntil` メソッドを定義することもできます。

最大例外数

ジョブを何度も試行することはできますが、再試行が特定の数の未処理の例外によってトリガーされた場合(`release` メソッドによって直接解放されるのではなく)、失敗する必要があることを指定したい場合があります。これを実現するには、ジョブクラスに `maxExceptions` プロパティを定義できます。

<?php
 
namespace App\Jobs;
 
use Illuminate\Support\Facades\Redis;
 
class ProcessPodcast implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 25;
 
/**
* The maximum number of unhandled exceptions to allow before failing.
*
* @var int
*/
public $maxExceptions = 3;
 
/**
* Execute the job.
*/
public function handle(): void
{
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// Lock obtained, process the podcast...
}, function () {
// Unable to obtain lock...
return $this->release(10);
});
}
}

この例では、アプリケーションがRedisロックを取得できない場合、ジョブは10秒間解放され、最大25回まで再試行されます。ただし、ジョブによって処理されない例外が3回スローされた場合、ジョブは失敗します。

タイムアウト

多くの場合、キューに入れられたジョブの処理時間はおおよそ予測できます。このため、Laravelでは「タイムアウト」値を指定できます。デフォルトのタイムアウト値は60秒です。ジョブの処理時間がタイムアウト値で指定された秒数を超えた場合、ジョブを処理しているワーカーはエラーで終了します。通常、ワーカーはサーバーで設定されているプロセスマネージャーによって自動的に再起動されます。

ジョブの実行可能な最大秒数は、Artisanコマンドラインの--timeoutスイッチを使用して指定できます。

php artisan queue:work --timeout=30

ジョブがタイムアウトを繰り返し、最大試行回数を超えた場合、失敗としてマークされます。

ジョブクラス自体で、ジョブが実行される最大秒数を定義することもできます。ジョブでタイムアウトが指定されている場合、コマンドラインで指定されたタイムアウトよりも優先されます。

<?php
 
namespace App\Jobs;
 
class ProcessPodcast implements ShouldQueue
{
/**
* The number of seconds the job can run before timing out.
*
* @var int
*/
public $timeout = 120;
}

ソケットや発信HTTP接続などのIOブロッキングプロセスは、指定されたタイムアウトを尊重しない場合があります。したがって、これらの機能を使用する場合は、常にそれらのAPIを使用してタイムアウトを指定する必要があります。たとえば、Guzzleを使用する場合は、常に接続とリクエストのタイムアウト値を指定する必要があります。

exclamation

ジョブのタイムアウトを指定するには、pcntl PHP拡張機能をインストールする必要があります。さらに、ジョブの「タイムアウト」値は常に、「retry after」値よりも小さくなければなりません。そうでない場合、ジョブが実際の実行またはタイムアウトを完了する前に、再試行される可能性があります。

タイムアウト時の失敗

タイムアウト時にジョブを失敗としてマークする必要があることを示したい場合は、ジョブクラスに$failOnTimeoutプロパティを定義できます。

/**
* Indicate if the job should be marked as failed on timeout.
*
* @var bool
*/
public $failOnTimeout = true;

エラー処理

ジョブの処理中に例外がスローされた場合、ジョブは自動的にキューに戻され、再度試行されます。ジョブは、アプリケーションで許可されている最大試行回数に達するまで、繰り返し解放されます。最大試行回数は、queue:work Artisanコマンドで使用される--triesスイッチで定義されます。あるいは、ジョブクラス自体で最大試行回数を定義することもできます。キューワーカーの実行に関する詳細については、以下を参照してください

ジョブの手動解放

ジョブを手動でキューに戻して、後で再試行できるようにしたい場合があります。これは、releaseメソッドを呼び出すことで実現できます。

/**
* Execute the job.
*/
public function handle(): void
{
// ...
 
$this->release();
}

デフォルトでは、releaseメソッドはジョブをすぐに処理するためにキューに戻します。ただし、整数または日付インスタンスをreleaseメソッドに渡すことで、指定された秒数が経過するまでジョブを処理可能にしないようにキューに指示できます。

$this->release(10);
 
$this->release(now()->addSeconds(10));

ジョブの手動失敗

ジョブを手動で「失敗」としてマークする必要がある場合があります。そのためには、failメソッドを呼び出します。

/**
* Execute the job.
*/
public function handle(): void
{
// ...
 
$this->fail();
}

キャッチした例外のためにジョブを失敗としてマークしたい場合は、例外をfailメソッドに渡すことができます。または、便宜上、文字列のエラーメッセージを渡すことができます。これは、例外に変換されます。

$this->fail($exception);
 
$this->fail('Something went wrong.');
lightbulb

ジョブの失敗に関する詳細については、ジョブの失敗処理に関するドキュメントを参照してください。

ジョブのバッチ処理

Laravelのジョブバッチ機能を使用すると、ジョブのバッチを簡単に実行し、ジョブのバッチの実行が完了した後に何らかの処理を実行できます。始める前に、ジョブバッチに関するメタ情報(完了率など)を含むテーブルを作成するためのデータベースマイグレーションを作成する必要があります。このマイグレーションは、make:queue-batches-table Artisanコマンドを使用して生成できます。

php artisan make:queue-batches-table
 
php artisan migrate

バッチ処理可能なジョブの定義

バッチ処理可能なジョブを定義するには、通常どおりキュー可能なジョブを作成します。ただし、ジョブクラスにIlluminate\Bus\Batchableトレイトを追加する必要があります。このトレイトは、ジョブが実行されている現在のバッチを取得するために使用できるbatchメソッドを提供します。

<?php
 
namespace App\Jobs;
 
use Illuminate\Bus\Batchable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
 
class ImportCsv implements ShouldQueue
{
use Batchable, Queueable;
 
/**
* Execute the job.
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
// Determine if the batch has been cancelled...
 
return;
}
 
// Import a portion of the CSV file...
}
}

バッチのディスパッチ

ジョブのバッチをディスパッチするには、Busファサードのbatchメソッドを使用します。もちろん、バッチ処理は、完了コールバックと組み合わせて使用​​する場合に最も役立ちます。そのため、thencatchfinallyメソッドを使用して、バッチの完了コールバックを定義できます。これらの各コールバックは、呼び出されるとIlluminate\Bus\Batchインスタンスを受け取ります。この例では、CSVファイルから特定の行数を処理するジョブのバッチをキューに入れると想像しましょう。

use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;
 
$batch = Bus::batch([
new ImportCsv(1, 100),
new ImportCsv(101, 200),
new ImportCsv(201, 300),
new ImportCsv(301, 400),
new ImportCsv(401, 500),
])->before(function (Batch $batch) {
// The batch has been created but no jobs have been added...
})->progress(function (Batch $batch) {
// A single job has completed successfully...
})->then(function (Batch $batch) {
// All jobs completed successfully...
})->catch(function (Batch $batch, Throwable $e) {
// First batch job failure detected...
})->finally(function (Batch $batch) {
// The batch has finished executing...
})->dispatch();
 
return $batch->id;

$batch->idプロパティを介してアクセスできるバッチのIDを使用して、ディスパッチ後にLaravelコマンドバスにバッチに関する情報を問い合わせることができます。

exclamation

バッチコールバックはシリアル化され、後でLaravelキューによって実行されるため、コールバック内では$this変数を使用しないでください。さらに、バッチされたジョブはデータベーストランザクションでラップされているため、暗黙的なコミットをトリガーするデータベースステートメントは、ジョブ内では実行しないでください。

バッチの命名

Laravel HorizonやLaravel Telescopeなどのツールでは、バッチに名前を付けることで、よりユーザーフレンドリーなデバッグ情報をバッチに提供できます。バッチに任意の名前を付けるには、バッチを定義するときにnameメソッドを呼び出します。

$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->name('Import CSV')->dispatch();

バッチ接続とキュー

バッチジョブに使用する接続とキューを指定する場合は、onConnectionメソッドとonQueueメソッドを使用できます。すべてのバッチジョブは、同じ接続とキュー内で実行する必要があります。

$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->onConnection('redis')->onQueue('imports')->dispatch();

チェーンとバッチ

配列内にチェーンジョブを配置することで、バッチ内にチェーンされたジョブのセットを定義できます。たとえば、2つのジョブチェーンを並列に実行し、両方のジョブチェーンの処理が完了したときにコールバックを実行できます。

use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
 
Bus::batch([
[
new ReleasePodcast(1),
new SendPodcastReleaseNotification(1),
],
[
new ReleasePodcast(2),
new SendPodcastReleaseNotification(2),
],
])->then(function (Batch $batch) {
// ...
})->dispatch();

逆に、チェーン内でジョブのバッチを定義することで、ジョブのバッチを実行できます。たとえば、最初に複数のポッドキャストをリリースするジョブのバッチを実行し、次にリリース通知を送信するジョブのバッチを実行できます。

use App\Jobs\FlushPodcastCache;
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Support\Facades\Bus;
 
Bus::chain([
new FlushPodcastCache,
Bus::batch([
new ReleasePodcast(1),
new ReleasePodcast(2),
]),
Bus::batch([
new SendPodcastReleaseNotification(1),
new SendPodcastReleaseNotification(2),
]),
])->dispatch();

バッチへのジョブの追加

バッチジョブ内からバッチに追加のジョブを追加することが役立つ場合があります。このパターンは、Webリクエスト中にディスパッチに時間がかかりすぎる可能性のある数千のジョブをバッチ化する必要がある場合に役立ちます。そのため、代わりに、さらに多くのジョブでバッチを水和する「ローダー」ジョブの最初のバッチをディスパッチすることをお勧めします。

$batch = Bus::batch([
new LoadImportBatch,
new LoadImportBatch,
new LoadImportBatch,
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->name('Import Contacts')->dispatch();

この例では、LoadImportBatchジョブを使用して、追加のジョブでバッチを水和します。これを実現するために、ジョブのbatchメソッドを介してアクセスできるバッチインスタンスのaddメソッドを使用できます。

use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;
 
/**
* Execute the job.
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
return;
}
 
$this->batch()->add(Collection::times(1000, function () {
return new ImportContacts;
}));
}
exclamation

同じバッチに属するジョブ内からのみ、ジョブをバッチに追加できます。

バッチの検査

バッチ完了コールバックに提供されるIlluminate\Bus\Batchインスタンスには、特定のバッチのジョブとのやり取りと検査を支援するためのさまざまなプロパティとメソッドがあります。

// The UUID of the batch...
$batch->id;
 
// The name of the batch (if applicable)...
$batch->name;
 
// The number of jobs assigned to the batch...
$batch->totalJobs;
 
// The number of jobs that have not been processed by the queue...
$batch->pendingJobs;
 
// The number of jobs that have failed...
$batch->failedJobs;
 
// The number of jobs that have been processed thus far...
$batch->processedJobs();
 
// The completion percentage of the batch (0-100)...
$batch->progress();
 
// Indicates if the batch has finished executing...
$batch->finished();
 
// Cancel the execution of the batch...
$batch->cancel();
 
// Indicates if the batch has been cancelled...
$batch->cancelled();

ルートからのバッチの返却

すべてのIlluminate\Bus\BatchインスタンスはJSONシリアライズ可能であるため、アプリケーションのルートのいずれかから直接返して、完了の進捗状況を含むバッチに関する情報を提供するJSONペイロードを取得できます。これにより、アプリケーションのUIでバッチの完了の進捗状況に関する情報を表示することが容易になります。

IDでバッチを取得するには、BusファサードのfindBatchメソッドを使用します。

use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;
 
Route::get('/batch/{batchId}', function (string $batchId) {
return Bus::findBatch($batchId);
});

バッチのキャンセル

特定のバッチの実行をキャンセルする必要がある場合があります。これは、Illuminate\Bus\Batchインスタンスでcancelメソッドを呼び出すことで実現できます。

/**
* Execute the job.
*/
public function handle(): void
{
if ($this->user->exceedsImportLimit()) {
return $this->batch()->cancel();
}
 
if ($this->batch()->cancelled()) {
return;
}
}

前の例で気づいたかもしれませんが、バッチジョブは通常、実行を続行する前に対応するバッチがキャンセルされたかどうかを判断する必要があります。ただし、便宜上、代わりにジョブにSkipIfBatchCancelledミドルウェアを割り当てることができます。その名前が示すように、このミドルウェアは、対応するバッチがキャンセルされている場合、Laravelにジョブを処理しないように指示します。

use Illuminate\Queue\Middleware\SkipIfBatchCancelled;
 
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [new SkipIfBatchCancelled];
}

バッチの失敗

バッチジョブが失敗すると、(割り当てられている場合)catchコールバックが呼び出されます。このコールバックは、バッチ内で最初に失敗したジョブに対してのみ呼び出されます。

失敗の許可

バッチ内のジョブが失敗すると、Laravelは自動的にバッチを「キャンセル済み」としてマークします。必要に応じて、この動作を無効にして、ジョブの失敗によってバッチが自動的にキャンセル済みとしてマークされないようにできます。これは、バッチをディスパッチするときにallowFailuresメソッドを呼び出すことで実現できます。

$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->allowFailures()->dispatch();

失敗したバッチジョブの再試行

便宜上、Laravelは、特定のバッチのすべての失敗したジョブを簡単に再試行できるqueue:retry-batch Artisanコマンドを提供します。queue:retry-batchコマンドは、失敗したジョブを再試行するバッチのUUIDを受け入れます。

php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5

バッチのプルーニング

プルーニングを行わないと、job_batchesテーブルは非常に迅速にレコードが蓄積される可能性があります。これを軽減するには、毎日queue:prune-batches Artisanコマンドを実行するようにスケジュールする必要があります。

use Illuminate\Support\Facades\Schedule;
 
Schedule::command('queue:prune-batches')->daily();

デフォルトでは、24時間以上経過したすべての完了済みのバッチがプルーニングされます。コマンドを呼び出すときにhoursオプションを使用して、バッチデータを保持する期間を決定できます。たとえば、次のコマンドは、48時間以上前に終了したすべてのバッチを削除します。

use Illuminate\Support\Facades\Schedule;
 
Schedule::command('queue:prune-batches --hours=48')->daily();

場合によっては、jobs_batchesテーブルに、正常に完了しなかったバッチ(ジョブが失敗し、そのジョブが正常に再試行されなかったバッチなど)のバッチレコードが蓄積される場合があります。queue:prune-batchesコマンドに、unfinishedオプションを使用してこれらの未完了のバッチレコードをプルーニングするように指示できます。

use Illuminate\Support\Facades\Schedule;
 
Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();

同様に、jobs_batchesテーブルには、キャンセルされたバッチのバッチレコードも蓄積される場合があります。queue:prune-batchesコマンドに、cancelledオプションを使用してこれらのキャンセルされたバッチレコードをプルーニングするように指示できます。

use Illuminate\Support\Facades\Schedule;
 
Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();

DynamoDBへのバッチの保存

Laravelは、リレーショナルデータベースの代わりにDynamoDBにバッチメタ情報を保存することもサポートしています。ただし、すべてのバッチレコードを保存するためのDynamoDBテーブルを手動で作成する必要があります。

通常、このテーブルの名前はjob_batchesにする必要がありますが、アプリケーションのqueue構成ファイル内のqueue.batching.table構成値に基づいてテーブルに名前を付ける必要があります。

DynamoDBバッチテーブルの構成

job_batchesテーブルには、applicationという名前の文字列のプライマリパーティションキーと、idという名前の文字列のプライマリソートキーが必要です。キーのapplication部分は、アプリケーションのapp構成ファイル内のname構成値で定義されているアプリケーション名を含みます。アプリケーション名はDynamoDBテーブルのキーの一部であるため、同じテーブルを使用して複数のLaravelアプリケーションのジョブバッチを保存できます。

さらに、バッチの自動削除 を利用したい場合は、テーブルにttl属性を定義できます。

DynamoDBの設定

次に、LaravelアプリケーションがAmazon DynamoDBと通信できるように、AWS SDKをインストールします。

composer require aws/aws-sdk-php

その後、queue.batching.driver設定オプションの値をdynamodbに設定します。さらに、batching設定配列内にkeysecretregion設定オプションを定義する必要があります。これらのオプションは、AWSへの認証に使用されます。dynamodbドライバを使用する場合は、queue.batching.database設定オプションは不要です。

'batching' => [
'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
],

DynamoDBでのバッチの削除

DynamoDBを使用してジョブバッチ情報を保存する場合、リレーショナルデータベースに保存されたバッチを削除するために使用される一般的な削除コマンドは機能しません。代わりに、DynamoDBのネイティブTTL機能 を使用して、古いバッチのレコードを自動的に削除できます。

DynamoDBテーブルにttl属性を定義した場合、Laravelにバッチレコードの削除方法を指示する設定パラメータを定義できます。queue.batching.ttl_attribute設定値はTTLを保持する属性の名前を定義し、queue.batching.ttl設定値は、レコードが最後に更新されてからバッチレコードをDynamoDBテーブルから削除できるまでの秒数を定義します。

'batching' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
'ttl_attribute' => 'ttl',
'ttl' => 60 * 60 * 24 * 7, // 7 days...
],

クロージャのキューイング

キューにジョブクラスをディスパッチする代わりに、クロージャをディスパッチすることもできます。これは、現在のリクエストサイクルの外で実行する必要がある迅速で簡単なタスクに最適です。クロージャをキューにディスパッチする場合、クロージャのコード内容は暗号的に署名されるため、転送中に変更されることはありません。

$podcast = App\Podcast::find(1);
 
dispatch(function () use ($podcast) {
$podcast->publish();
});

設定された再試行回数 をすべて使い果たした後に、キューに入れられたクロージャが正常に完了しなかった場合に実行されるクロージャを、catchメソッドを使用して指定できます。

use Throwable;
 
dispatch(function () use ($podcast) {
$podcast->publish();
})->catch(function (Throwable $e) {
// This job has failed...
});
exclamation

catchコールバックはシリアル化されて後でLaravelキューによって実行されるため、catchコールバック内で$this変数を使用しないでください。

キューワーカーの実行

queue:workコマンド

Laravelには、キューワーカーを起動し、新しいジョブがキューにプッシュされたら処理するArtisanコマンドが含まれています。queue:work Artisanコマンドを使用してワーカーを実行できます。queue:workコマンドが開始されると、手動で停止するか、ターミナルを閉じない限り、実行を続けます。

php artisan queue:work
lightbulb

queue:workプロセスをバックグラウンドで永続的に実行するには、Supervisorなどのプロセスモニターを使用して、キューワーカーの実行が停止しないようにする必要があります。

処理されたジョブIDをコマンドの出力に含めたい場合は、queue:workコマンドを呼び出す際に-vフラグを含めることができます。

php artisan queue:work -v

キューワーカーは長期間実行されるプロセスであり、起動されたアプリケーションの状態をメモリに保存することに注意してください。そのため、開始後にコードベースが変更されても、それらの変更に気付くことはありません。したがって、デプロイプロセス中に、キューワーカーを再起動 するようにしてください。さらに、アプリケーションによって作成または変更された静的な状態は、ジョブ間で自動的にリセットされることはありません。

または、queue:listenコマンドを実行することもできます。queue:listenコマンドを使用する場合、更新されたコードをリロードしたり、アプリケーションの状態をリセットしたりする際にワーカーを手動で再起動する必要はありませんが、このコマンドはqueue:workコマンドよりもはるかに効率性が低いです。

php artisan queue:listen

複数のキューワーカーの実行

複数のワーカーをキューに割り当ててジョブを同時に処理するには、複数のqueue:workプロセスを開始するだけです。これは、ターミナルの複数のタブでローカルに行うことも、プロセスマネージャーの設定を使用して本番環境で行うこともできます。Supervisorを使用する場合は、numprocs設定値を使用できます。

接続とキューの指定

ワーカーが使用するキュー接続を指定することもできます。workコマンドに渡される接続名は、config/queue.php設定ファイルに定義されている接続のいずれかに対応している必要があります。

php artisan queue:work redis

デフォルトでは、queue:workコマンドは、特定の接続上のデフォルトキューのジョブのみを処理します。ただし、特定の接続の特定のキューのみを処理するように、キューワーカーをさらにカスタマイズできます。たとえば、すべてのメールがredisキュー接続のemailsキューで処理される場合、そのキューのみを処理するワーカーを起動するには、次のコマンドを実行できます。

php artisan queue:work redis --queue=emails

指定された数のジョブの処理

--onceオプションを使用して、ワーカーにキューから単一のジョブのみを処理するように指示できます。

php artisan queue:work --once

--max-jobsオプションを使用して、ワーカーに指定された数のジョブを処理してから終了するように指示できます。このオプションは、Supervisorと組み合わせて使用​​すると、ワーカーが指定された数のジョブを処理した後に自動的に再起動され、蓄積されたメモリを解放するため、役立ちます。

php artisan queue:work --max-jobs=1000

キューに入れられたすべてのジョブを処理してから終了する

--stop-when-emptyオプションを使用して、ワーカーにすべてのジョブを処理してから正常に終了するように指示できます。このオプションは、キューが空になった後にコンテナをシャットダウンしたい場合、Dockerコンテナ内でLaravelキューを処理する場合に役立ちます。

php artisan queue:work --stop-when-empty

指定された秒数のジョブの処理

--max-timeオプションを使用して、ワーカーに指定された秒数のジョブを処理してから終了するように指示できます。このオプションは、Supervisorと組み合わせて使用​​すると、ワーカーが指定された時間量のジョブを処理した後に自動的に再起動され、蓄積されたメモリを解放するため、役立ちます。

# Process jobs for one hour and then exit...
php artisan queue:work --max-time=3600

ワーカーのスリープ時間

キューでジョブが利用可能な場合、ワーカーはジョブ間の遅延なしでジョブの処理を続けます。ただし、sleepオプションは、利用可能なジョブがない場合にワーカーが「スリープ」する秒数を決定します。もちろん、スリープ中は、ワーカーは新しいジョブを処理しません。

php artisan queue:work --sleep=3

メンテナンスモードとキュー

アプリケーションがメンテナンスモードになっている間は、キューに入れられたジョブは処理されません。アプリケーションがメンテナンスモードから外れると、ジョブは通常どおり処理されます。

メンテナンスモードが有効になっている場合でもキューワーカーにジョブを処理させるには、--forceオプションを使用できます。

php artisan queue:work --force

リソースの考慮事項

デーモンキューワーカーは、各ジョブを処理する前にフレームワークを「再起動」しません。そのため、各ジョブの完了後に重いリソースを解放する必要があります。たとえば、GDライブラリで画像操作を行う場合は、画像の処理が完了したらimagedestroyを使用してメモリを解放する必要があります。

キューの優先度

キューの処理方法の優先順位を付けたい場合があります。たとえば、config/queue.php設定ファイルで、redis接続のデフォルトのqueuelowに設定できます。ただし、場合によっては、次のようにhigh優先順位のキューにジョブをプッシュしたい場合があります。

dispatch((new Job)->onQueue('high'));

highキューのすべてのジョブが処理されたことを確認してから、lowキューのジョブに進むワーカーを起動するには、コンマ区切りのキュー名のリストをworkコマンドに渡します。

php artisan queue:work --queue=high,low

キューワーカーとデプロイ

キューワーカーは長期間実行されるプロセスであるため、再起動されない限り、コードの変更に気付くことはありません。そのため、キューワーカーを使用するアプリケーションをデプロイする最も簡単な方法は、デプロイプロセス中にワーカーを再起動することです。queue:restartコマンドを実行することで、すべてのワーカーを正常に再起動できます。

php artisan queue:restart

このコマンドは、すべてのキューワーカーに、現在のジョブの処理が完了した後に正常に終了するように指示するため、既存のジョブが失われることはありません。queue:restartコマンドが実行されるとキューワーカーが終了するため、Supervisorなどのプロセスマネージャーを実行して、キューワーカーを自動的に再起動する必要があります。

lightbulb

キューはキャッシュを使用して再起動シグナルを保存するため、この機能を使用する前に、アプリケーションでキャッシュドライバが正しく構成されていることを確認する必要があります。

ジョブの期限切れとタイムアウト

ジョブの期限切れ

config/queue.php設定ファイルでは、各キュー接続でretry_afterオプションが定義されています。このオプションは、処理中のジョブの再試行前にキュー接続が待機する秒数を指定します。たとえば、retry_afterの値が90に設定されている場合、ジョブは解放または削除されずに90秒間処理されていると、キューに戻されます。通常、retry_afterの値は、ジョブの処理が完了するのに合理的にかかる最大秒数に設定する必要があります。

exclamation

retry_after値が含まれていない唯一のキュー接続はAmazon SQSです。SQSは、AWSコンソールで管理されているデフォルトの表示タイムアウトに基づいてジョブを再試行します。

ワーカーのタイムアウト

queue:work Artisanコマンドは--timeoutオプションを公開しています。デフォルトでは、--timeoutの値は60秒です。ジョブがタイムアウト値で指定された秒数よりも長く処理されている場合、ジョブを処理しているワーカーはエラーで終了します。通常、ワーカーはサーバーで設定されたプロセスマネージャーによって自動的に再起動されます。

php artisan queue:work --timeout=60

retry_after設定オプションと--timeoutCLIオプションは異なりますが、ジョブが失われないようにし、ジョブが一度だけ正常に処理されるように連携して動作します。

exclamation

--timeoutの値は、常にretry_after設定値よりも数秒短くなっている必要があります。これにより、フリーズしたジョブを処理しているワーカーは、ジョブが再試行される前に常に終了されます。--timeoutオプションがretry_after設定値よりも長い場合、ジョブが2回処理される可能性があります。

Supervisorの設定

本番環境では、queue:workプロセスを実行し続ける方法が必要です。ワーカーのタイムアウト超過やqueue:restartコマンドの実行など、さまざまな理由でqueue:workプロセスが停止する可能性があります。

このため、queue:workプロセスの終了を検出し、自動的に再起動できるプロセスモニターを構成する必要があります。さらに、プロセスモニターを使用すると、同時に実行するqueue:workプロセスの数を指定できます。SupervisorはLinux環境で一般的に使用されるプロセスモニターであり、次のドキュメントでその構成方法について説明します。

Supervisorのインストール

SupervisorはLinuxオペレーティングシステムのプロセスモニターであり、失敗した場合にqueue:workプロセスを自動的に再起動します。UbuntuにSupervisorをインストールするには、次のコマンドを使用できます。

sudo apt-get install supervisor
lightbulb

Supervisorの構成と管理が困難な場合は、Laravel Forgeを使用することを検討してください。Laravel Forgeは、本番環境のLaravelプロジェクトにSupervisorを自動的にインストールして構成します。

Supervisorの設定

Supervisorの設定ファイルは、通常、/etc/supervisor/conf.dディレクトリに保存されます。このディレクトリ内では、プロセスの監視方法をSupervisorに指示する任意の数の設定ファイルを作成できます。たとえば、queue:workプロセスを起動して監視するlaravel-worker.confファイルを作成しましょう。

[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600

この例では、numprocsディレクティブにより、Supervisorは8つのqueue:workプロセスを実行し、すべてを監視して、失敗した場合に自動的に再起動します。目的のキュー接続とワーカーオプションを反映するように、設定のcommandディレクティブを変更する必要があります。

exclamation

stopwaitsecsの値が、最も長く実行されるジョブによって消費される秒数よりも大きくなっていることを確認する必要があります。そうでない場合、Supervisorはジョブが処理される前にジョブを強制終了する可能性があります。

Supervisorの起動

設定ファイルの作成が完了したら、以下のコマンドを使用してSupervisorの設定を更新し、プロセスを開始できます。

sudo supervisorctl reread
 
sudo supervisorctl update
 
sudo supervisorctl start "laravel-worker:*"

Supervisorの詳細については、Supervisorドキュメントを参照してください。

失敗したジョブの処理

キューに入れられたジョブが失敗することがあります。心配しないでください。計画通りにいかないこともあります!Laravelには、ジョブの最大試行回数を指定する便利な方法が用意されています。非同期ジョブの試行回数がこの回数を超えると、`failed_jobs`データベーステーブルに挿入されます。同期的にディスパッチされたジョブが失敗した場合、このテーブルには保存されず、例外はアプリケーションによってすぐに処理されます。

`failed_jobs`テーブルを作成するためのマイグレーションは、通常、新しいLaravelアプリケーションに既に含まれています。ただし、アプリケーションにこのテーブルのマイグレーションが含まれていない場合は、`make:queue-failed-table`コマンドを使用してマイグレーションを作成できます。

php artisan make:queue-failed-table
 
php artisan migrate

キューワーカープロセスを実行する際に、`queue:work`コマンドの`--tries`スイッチを使用して、ジョブの最大試行回数を指定できます。`--tries`オプションに値を指定しない場合、ジョブは1回のみ、またはジョブクラスの`$tries`プロパティで指定された回数だけ試行されます。

php artisan queue:work redis --tries=3

`--backoff`オプションを使用して、例外が発生したジョブを再試行する前にLaravelが待機する秒数を指定できます。デフォルトでは、ジョブはすぐにキューに戻されるため、再度試行できます。

php artisan queue:work redis --tries=3 --backoff=3

ジョブごとに、例外が発生したジョブを再試行する前にLaravelが待機する秒数を設定したい場合は、ジョブクラスに`backoff`プロパティを定義します。

/**
* The number of seconds to wait before retrying the job.
*
* @var int
*/
public $backoff = 3;

ジョブのバックオフ時間を決定するためのより複雑なロジックが必要な場合は、ジョブクラスに`backoff`メソッドを定義できます。

/**
* Calculate the number of seconds to wait before retrying the job.
*/
public function backoff(): int
{
return 3;
}

`backoff`メソッドからバックオフ値の配列を返すことで、簡単に「指数関数的」なバックオフを設定できます。この例では、最初の再試行の遅延は1秒、2回目の再試行は5秒、3回目の再試行は10秒、それ以降の再試行は残りの試行回数に関わらず10秒になります。

/**
* Calculate the number of seconds to wait before retrying the job.
*
* @return array<int, int>
*/
public function backoff(): array
{
return [1, 5, 10];
}

失敗したジョブの後処理

特定のジョブが失敗した場合、ユーザーにアラートを送信したり、ジョブによって部分的に完了したアクションをロールバックしたりすることがあります。これを実現するには、ジョブクラスに`failed`メソッドを定義できます。ジョブの失敗原因となった`Throwable`インスタンスが`failed`メソッドに渡されます。

<?php
 
namespace App\Jobs;
 
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Throwable;
 
class ProcessPodcast implements ShouldQueue
{
use Queueable;
 
/**
* Create a new job instance.
*/
public function __construct(
public Podcast $podcast,
) {}
 
/**
* Execute the job.
*/
public function handle(AudioProcessor $processor): void
{
// Process uploaded podcast...
}
 
/**
* Handle a job failure.
*/
public function failed(?Throwable $exception): void
{
// Send user notification of failure, etc...
}
}
exclamation

`failed`メソッドを呼び出す前に、ジョブの新しいインスタンスがインスタンス化されます。そのため、`handle`メソッド内で発生した可能性のあるクラスプロパティの変更は失われます。

失敗したジョブの再試行

`failed_jobs`データベーステーブルに挿入されたすべての失敗したジョブを表示するには、`queue:failed` Artisanコマンドを使用できます。

php artisan queue:failed

`queue:failed`コマンドは、ジョブID、接続、キュー、失敗時間、およびジョブに関するその他の情報を一覧表示します。ジョブIDを使用して、失敗したジョブを再試行できます。たとえば、IDが`ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece`の失敗したジョブを再試行するには、次のコマンドを実行します。

php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece

必要に応じて、複数IDをコマンドに渡すことができます。

php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d

特定のキューのすべての失敗したジョブを再試行することもできます。

php artisan queue:retry --queue=name

すべての失敗したジョブを再試行するには、`queue:retry`コマンドを実行し、IDとして`all`を渡します。

php artisan queue:retry all

失敗したジョブを削除する場合は、`queue:forget`コマンドを使用できます。

php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
lightbulb

Horizonを使用する場合は、`queue:forget`コマンドではなく、`horizon:forget`コマンドを使用して失敗したジョブを削除する必要があります。

`failed_jobs`テーブルからすべての失敗したジョブを削除するには、`queue:flush`コマンドを使用できます。

php artisan queue:flush

欠落しているモデルの無視

Eloquentモデルをジョブにインジェクションする場合、モデルはキューに配置される前に自動的にシリアル化され、ジョブがワーカーによって処理されるときにデータベースから再取得されます。ただし、ジョブがワーカーによる処理を待っている間にモデルが削除された場合、ジョブは`ModelNotFoundException`で失敗する可能性があります。

便宜上、ジョブの`deleteWhenMissingModels`プロパティを`true`に設定することで、欠落しているモデルを持つジョブを自動的に削除することを選択できます。このプロパティが`true`に設定されている場合、Laravelは例外を発生させることなく、ジョブを静かに破棄します。

/**
* Delete the job if its models no longer exist.
*
* @var bool
*/
public $deleteWhenMissingModels = true;

失敗したジョブのプルーニング

アプリケーションの`failed_jobs`テーブルのレコードを削除するには、`queue:prune-failed` Artisanコマンドを呼び出します。

php artisan queue:prune-failed

デフォルトでは、24時間以上経過したすべての失敗したジョブレコードが削除されます。コマンドに`--hours`オプションを指定すると、過去N時間以内に挿入された失敗したジョブレコードのみが保持されます。たとえば、次のコマンドは、48時間以上前に挿入されたすべての失敗したジョブレコードを削除します。

php artisan queue:prune-failed --hours=48

DynamoDBへの失敗したジョブの保存

Laravelは、リレーショナルデータベーステーブルの代わりにDynamoDBに失敗したジョブレコードを保存することもサポートしています。ただし、すべての失敗したジョブレコードを保存するためのDynamoDBテーブルを手動で作成する必要があります。通常、このテーブルの名前は`failed_jobs`にする必要がありますが、アプリケーションの`queue`設定ファイル内の`queue.failed.table`設定値に基づいてテーブル名を付ける必要があります。

`failed_jobs`テーブルには、`application`という名前の文字列のプライマリパーティションキーと、`uuid`という名前の文字列のプライマリソートキーが必要です。キーの`application`部分は、アプリケーションの`app`設定ファイル内の`name`設定値で定義されているアプリケーション名を含みます。アプリケーション名はDynamoDBテーブルのキーの一部であるため、同じテーブルを使用して複数のLaravelアプリケーションの失敗したジョブを保存できます。

さらに、LaravelアプリケーションがAmazon DynamoDBと通信できるように、AWS SDKをインストールしてください。

composer require aws/aws-sdk-php

次に、`queue.failed.driver`設定オプションの値を`dynamodb`に設定します。さらに、失敗したジョブの設定配列内に`key`、`secret`、`region`設定オプションを定義する必要があります。これらのオプションは、AWSとの認証に使用されます。`dynamodb`ドライバを使用する場合、`queue.failed.database`設定オプションは不要です。

'failed' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'failed_jobs',
],

失敗したジョブの保存の無効化

`queue.failed.driver`設定オプションの値を`null`に設定することで、保存せずに失敗したジョブを破棄するようにLaravelに指示できます。通常、これは`QUEUE_FAILED_DRIVER`環境変数を使用して行うことができます。

QUEUE_FAILED_DRIVER=null

失敗したジョブのイベント

ジョブが失敗したときに呼び出されるイベントリスナーを登録したい場合は、`Queue`ファサードの`failing`メソッドを使用できます。たとえば、Laravelに含まれる`AppServiceProvider`の`boot`メソッドからこのイベントにクロージャをアタッチできます。

<?php
 
namespace App\Providers;
 
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;
 
class AppServiceProvider extends ServiceProvider
{
/**
* Register any application services.
*/
public function register(): void
{
// ...
}
 
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
}

キューからのジョブのクリア

lightbulb

Horizonを使用する場合は、`queue:clear`コマンドではなく、`horizon:clear`コマンドを使用してキューからジョブをクリアする必要があります。

デフォルト接続のデフォルトキューからすべてのジョブを削除する場合は、`queue:clear` Artisanコマンドを使用して削除できます。

php artisan queue:clear

特定の接続とキューからジョブを削除するには、`connection`引数と`queue`オプションを指定することもできます。

php artisan queue:clear redis --queue=emails
exclamation

キューからのジョブのクリアは、SQS、Redis、およびデータベースキュードライバでのみ使用できます。さらに、SQSメッセージの削除プロセスには最大60秒かかるため、キューをクリアした後に最大60秒間SQSキューに送信されたジョブも削除される可能性があります。

キューの監視

キューに突然多くのジョブが到着すると、キューがオーバーロードされ、ジョブの完了待ち時間が長くなる可能性があります。必要に応じて、Laravelはキューのジョブ数が指定されたしきい値を超えたときに警告できます。

開始するには、1分ごとに実行されるように`queue:monitor`コマンドをスケジュールする必要があります。このコマンドは、監視するキューの名前と、目的のジョブ数のしきい値を受け入れます。

php artisan queue:monitor redis:default,redis:deployments --max=100

このコマンドをスケジュールするだけでは、キューのオーバーロード状態を警告する通知をトリガーするのに十分ではありません。コマンドがしきい値を超えるジョブ数を持つキューを検出すると、`Illuminate\Queue\Events\QueueBusy`イベントがディスパッチされます。アプリケーションの`AppServiceProvider`内でこのイベントをリッスンして、自分または開発チームに通知を送信できます。

use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;
 
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Event::listen(function (QueueBusy $event) {
Notification::route('mail', '[email protected]')
->notify(new QueueHasLongWaitTime(
$event->connection,
$event->queue,
$event->size
));
});
}

テスト

ジョブをディスパッチするコードをテストする際、ジョブ自体を実際に実行しないようにLaravelに指示することがあります。ジョブのコードは、ジョブをディスパッチするコードとは別に直接テストできるためです。もちろん、ジョブ自体をテストするには、ジョブインスタンスをインスタンス化し、テストで直接`handle`メソッドを呼び出すことができます。

キューにプッシュされたジョブを実際に実行しないようにするには、`Queue`ファサードの`fake`メソッドを使用できます。`Queue`ファサードの`fake`メソッドを呼び出した後、アプリケーションがキューにジョブをプッシュしようとしたことをアサートできます。

<?php
 
use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
 
test('orders can be shipped', function () {
Queue::fake();
 
// Perform order shipping...
 
// Assert that no jobs were pushed...
Queue::assertNothingPushed();
 
// Assert a job was pushed to a given queue...
Queue::assertPushedOn('queue-name', ShipOrder::class);
 
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
 
// Assert a job was not pushed...
Queue::assertNotPushed(AnotherJob::class);
 
// Assert that a Closure was pushed to the queue...
Queue::assertClosurePushed();
 
// Assert the total number of jobs that were pushed...
Queue::assertCount(3);
});
<?php
 
namespace Tests\Feature;
 
use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;
 
class ExampleTest extends TestCase
{
public function test_orders_can_be_shipped(): void
{
Queue::fake();
 
// Perform order shipping...
 
// Assert that no jobs were pushed...
Queue::assertNothingPushed();
 
// Assert a job was pushed to a given queue...
Queue::assertPushedOn('queue-name', ShipOrder::class);
 
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
 
// Assert a job was not pushed...
Queue::assertNotPushed(AnotherJob::class);
 
// Assert that a Closure was pushed to the queue...
Queue::assertClosurePushed();
 
// Assert the total number of jobs that were pushed...
Queue::assertCount(3);
}
}

特定の「真偽テスト」に合格するジョブがプッシュされたことをアサートするには、`assertPushed`メソッドまたは`assertNotPushed`メソッドにクロージャを渡すことができます。指定された真偽テストに合格するジョブが少なくとも1つプッシュされた場合、アサーションは成功します。

Queue::assertPushed(function (ShipOrder $job) use ($order) {
return $job->order->id === $order->id;
});

ジョブのサブセットのフェイク

他のジョブは通常どおり実行したまま、特定のジョブだけを偽装する必要がある場合は、偽装するジョブのクラス名を`fake`メソッドに渡すことができます。

test('orders can be shipped', function () {
Queue::fake([
ShipOrder::class,
]);
 
// Perform order shipping...
 
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
});
public function test_orders_can_be_shipped(): void
{
Queue::fake([
ShipOrder::class,
]);
 
// Perform order shipping...
 
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
}

指定されたジョブのセットを除くすべてのジョブを偽装するには、`except`メソッドを使用できます。

Queue::fake()->except([
ShipOrder::class,
]);

ジョブチェーンのテスト

ジョブチェーンをテストするには、`Bus`ファサードの偽装機能を使用する必要があります。`Bus`ファサードの`assertChained`メソッドを使用して、ジョブチェーンがディスパッチされたことをアサートできます。`assertChained`メソッドは、最初の引数としてジョブチェーンの配列を受け入れます。

use App\Jobs\RecordShipment;
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Support\Facades\Bus;
 
Bus::fake();
 
// ...
 
Bus::assertChained([
ShipOrder::class,
RecordShipment::class,
UpdateInventory::class
]);

上記の例にあるように、チェーンされたジョブの配列はジョブのクラス名の配列にすることができます。しかし、実際のジョブインスタンスの配列を提供することもできます。そうした場合、Laravelは、ジョブインスタンスが同じクラスであり、アプリケーションによってディスパッチされたチェーンされたジョブのプロパティ値が同じであることを保証します。

Bus::assertChained([
new ShipOrder,
new RecordShipment,
new UpdateInventory,
]);

ジョブがジョブチェーンなしでプッシュされたことを確認するには、`assertDispatchedWithoutChain`メソッドを使用できます。

Bus::assertDispatchedWithoutChain(ShipOrder::class);

チェーンの変更テスト

チェーンされたジョブが既存のチェーンにジョブを追加または削除する場合、ジョブの`assertHasChain`メソッドを使用して、ジョブが期待される残りのジョブチェーンを持っていることを確認できます。

$job = new ProcessPodcast;
 
$job->handle();
 
$job->assertHasChain([
new TranscribePodcast,
new OptimizePodcast,
new ReleasePodcast,
]);

`assertDoesntHaveChain`メソッドは、ジョブの残りのチェーンが空であることを確認するために使用できます。

$job->assertDoesntHaveChain();

チェーンされたバッチのテスト

ジョブチェーンにジョブのバッチが含まれている場合、チェーンアサーション内に`Bus::chainedBatch`定義を挿入することで、チェーンされたバッチが期待どおりであることを確認できます。

use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
 
Bus::assertChained([
new ShipOrder,
Bus::chainedBatch(function (PendingBatch $batch) {
return $batch->jobs->count() === 3;
}),
new UpdateInventory,
]);

ジョブバッチのテスト

`Bus`ファサードの`assertBatched`メソッドを使用して、ジョブのバッチがディスパッチされたことを確認できます。`assertBatched`メソッドに渡されるクロージャは、`Illuminate\Bus\PendingBatch`のインスタンスを受け取ります。これは、バッチ内のジョブを検査するために使用できます。

use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
 
Bus::fake();
 
// ...
 
Bus::assertBatched(function (PendingBatch $batch) {
return $batch->name == 'import-csv' &&
$batch->jobs->count() === 10;
});

`assertBatchCount`メソッドを使用して、特定の数のバッチがディスパッチされたことを確認できます。

Bus::assertBatchCount(3);

`assertNothingBatched`を使用して、バッチがディスパッチされなかったことを確認できます。

Bus::assertNothingBatched();

ジョブ/バッチの相互作用テスト

さらに、個々のジョブとその基盤となるバッチとの相互作用をテストする必要がある場合があります。たとえば、ジョブがバッチの処理をキャンセルしたかどうかをテストする必要がある場合があります。これを実現するには、`withFakeBatch`メソッドを使用してジョブに偽のバッチを割り当てる必要があります。`withFakeBatch`メソッドは、ジョブインスタンスと偽のバッチを含むタプルを返します。

[$job, $batch] = (new ShipOrder)->withFakeBatch();
 
$job->handle();
 
$this->assertTrue($batch->cancelled());
$this->assertEmpty($batch->added);

ジョブ/キューの相互作用のテスト

キューされたジョブが自身をキューに戻すことをテストする必要がある場合があります。または、ジョブが自身を削除したことをテストする必要がある場合があります。これらのキューの相互作用をテストするには、ジョブをインスタンス化し、`withFakeQueueInteractions`メソッドを呼び出します。

ジョブのキューの相互作用が偽装されたら、ジョブで`handle`メソッドを呼び出すことができます。ジョブを呼び出した後、`assertReleased`、`assertDeleted`、`assertNotDeleted`、`assertFailed`、`assertNotFailed`メソッドを使用して、ジョブのキューの相互作用に対してアサーションを行うことができます。

use App\Jobs\ProcessPodcast;
 
$job = (new ProcessPodcast)->withFakeQueueInteractions();
 
$job->handle();
 
$job->assertReleased(delay: 30);
$job->assertDeleted();
$job->assertNotDeleted();
$job->assertFailed();
$job->assertNotFailed();

ジョブイベント

`Queue` ファサードの`before`メソッドと`after`メソッドを使用すると、キューされたジョブが処理される前または後に実行されるコールバックを指定できます。これらのコールバックは、追加のログを実行したり、ダッシュボードの統計をインクリメントしたりするのに最適な機会です。通常、これらのメソッドはサービスプロバイダの`boot`メソッドから呼び出す必要があります。たとえば、Laravelに含まれている`AppServiceProvider`を使用できます。

<?php
 
namespace App\Providers;
 
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
 
class AppServiceProvider extends ServiceProvider
{
/**
* Register any application services.
*/
public function register(): void
{
// ...
}
 
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
 
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
}

`Queue` ファサードの`looping`メソッドを使用すると、ワーカーがキューからジョブのフェッチを試行する前に実行されるコールバックを指定できます。たとえば、以前に失敗したジョブによって開かれたままになっているトランザクションをロールバックするクロージャを登録できます。

use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;
 
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});