Skip to content

队列

介绍

Laravel 队列提供了一个统一的 API,支持多种不同的队列后端,如 Beanstalk、Amazon SQS、Redis,甚至是关系数据库。队列允许您将耗时的任务(如发送电子邮件)的处理推迟到稍后进行。推迟这些耗时任务可以显著加快应用程序的 Web 请求速度。

队列配置文件存储在 config/queue.php 中。在此文件中,您将找到框架中包含的每个队列驱动程序的连接配置,其中包括数据库、BeanstalkdAmazon SQSRedis 和一个同步驱动程序(用于本地使用),该驱动程序将立即执行作业。还包括一个 null 队列驱动程序,它只会丢弃排队的作业。

连接与队列

在开始使用 Laravel 队列之前,了解“连接”和“队列”之间的区别很重要。在您的 config/queue.php 配置文件中,有一个 connections 配置选项。此选项定义了与后端服务(如 Amazon SQS、Beanstalk 或 Redis)的特定连接。然而,任何给定的队列连接可能有多个“队列”,可以被视为不同的作业堆栈或堆。

请注意,queue 配置文件中的每个连接配置示例都包含一个 queue 属性。这是作业发送到给定连接时将被调度到的默认队列。换句话说,如果您在调度作业时没有明确定义它应该被调度到哪个队列,作业将被放置在连接配置中定义的 queue 属性所指定的队列中:

php
// 这个作业被发送到默认队列...
dispatch(new Job);

// 这个作业被发送到 "emails" 队列...
dispatch((new Job)->onQueue('emails'));

某些应用程序可能不需要将作业推送到多个队列,而是更喜欢拥有一个简单的队列。然而,将作业推送到多个队列对于希望优先处理或分段处理作业的应用程序特别有用,因为 Laravel 队列工作者允许您通过优先级指定它应该处理哪些队列。例如,如果您将作业推送到 high 队列,您可以运行一个工作者,给予它们更高的处理优先级:

php
php artisan queue:work --queue=high,default

驱动程序先决条件

数据库

为了使用 database 队列驱动程序,您需要一个数据库表来保存作业。要生成创建此表的迁移,请运行 queue:table Artisan 命令。创建迁移后,您可以使用 migrate 命令迁移数据库:

php
php artisan queue:table

php artisan migrate

Redis

为了使用 redis 队列驱动程序,您应该在 config/database.php 配置文件中配置一个 Redis 数据库连接。

如果您的 Redis 队列连接使用 Redis 集群,您的队列名称必须包含一个 key hash tag。这是为了确保给定队列的所有 Redis 键都放置在同一个哈希槽中:

php
'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => '{default}',
    'retry_after' => 90,
],

其他驱动程序先决条件

以下依赖项是列出的队列驱动程序所需的:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~3.0
  • Redis: predis/predis ~1.0

创建作业

生成作业类

默认情况下,应用程序的所有可排队作业都存储在 app/Jobs 目录中。如果 app/Jobs 目录不存在,当您运行 make:job Artisan 命令时,它将被创建。您可以使用 Artisan CLI 生成一个新的排队作业:

php
php artisan make:job SendReminderEmail

生成的类将实现 Illuminate\Contracts\Queue\ShouldQueue 接口,指示 Laravel 该作业应被推送到队列以异步运行。

类结构

作业类非常简单,通常只包含一个 handle 方法,该方法在作业被队列处理时调用。让我们从一个示例作业类开始。在此示例中,我们假设我们管理一个播客发布服务,并且需要在发布之前处理上传的播客文件:

php
<?php

namespace App\Jobs;

use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    /**
     * 创建一个新的作业实例。
     *
     * @param  Podcast  $podcast
     * @return void
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast;
    }

    /**
     * 执行作业。
     *
     * @param  AudioProcessor  $processor
     * @return void
     */
    public function handle(AudioProcessor $processor)
    {
        // 处理上传的播客...
    }
}

在此示例中,请注意我们能够将 Eloquent 模型 直接传递到排队作业的构造函数中。由于作业使用的 SerializesModels trait,Eloquent 模型将在作业处理时被优雅地序列化和反序列化。如果您的排队作业在其构造函数中接受 Eloquent 模型,则只有模型的标识符会被序列化到队列中。当作业实际处理时,队列系统将自动从数据库中重新检索完整的模型实例。这对您的应用程序来说是完全透明的,并防止了序列化完整 Eloquent 模型实例时可能出现的问题。

handle 方法在作业被队列处理时调用。请注意,我们能够在作业的 handle 方法上进行依赖注入。Laravel 服务容器 会自动注入这些依赖项。

exclamation

二进制数据(如原始图像内容)应在传递给排队作业之前通过 base64_encode 函数进行处理。否则,作业在放入队列时可能无法正确序列化为 JSON。

调度作业

编写作业类后,您可以使用 dispatch 辅助函数调度它。您只需将作业实例作为参数传递给 dispatch 辅助函数:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        dispatch(new ProcessPodcast($podcast));
    }
}
lightbulb

dispatch 辅助函数提供了一个简短、全局可用的函数的便利性,同时也非常容易测试。查看 Laravel 测试文档 以了解更多信息。

延迟调度

如果您希望延迟执行排队作业,可以在作业实例上使用 delay 方法。delay 方法由 Illuminate\Bus\Queueable trait 提供,该 trait 默认包含在所有生成的作业类中。例如,让我们指定一个作业在调度后 10 分钟内不可用进行处理:

php
<?php

namespace App\Http\Controllers;

use Carbon\Carbon;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        $job = (new ProcessPodcast($podcast))
                    ->delay(Carbon::now()->addMinutes(10));

        dispatch($job);
    }
}
exclamation

Amazon SQS 队列服务的最大延迟时间为 15 分钟。

自定义队列和连接

调度到特定队列

通过将作业推送到不同的队列,您可以“分类”您的排队作业,甚至可以优先处理您分配给各种队列的工作者数量。请记住,这不会将作业推送到您的队列配置文件中定义的不同队列“连接”,而只是推送到单个连接中的特定队列。要指定队列,请在作业实例上使用 onQueue 方法:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        $job = (new ProcessPodcast($podcast))->onQueue('processing');

        dispatch($job);
    }
}

调度到特定连接

如果您正在处理多个队列连接,可以指定将作业推送到哪个连接。要指定连接,请在作业实例上使用 onConnection 方法:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        $job = (new ProcessPodcast($podcast))->onConnection('sqs');

        dispatch($job);
    }
}

当然,您可以链式调用 onConnectiononQueue 方法,以指定作业的连接和队列:

php
$job = (new ProcessPodcast($podcast))
                ->onConnection('sqs')
                ->onQueue('processing');

指定最大作业尝试次数/超时值

最大尝试次数

指定作业最大尝试次数的一种方法是通过 Artisan 命令行上的 --tries 开关:

php
php artisan queue:work --tries=3

但是,您可以通过在作业类本身上定义最大尝试次数来采取更细粒度的方法。如果在作业上指定了最大尝试次数,它将优先于命令行上提供的值:

php
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 作业可以尝试的次数。
     *
     * @var int
     */
    public $tries = 5;
}

超时

exclamation

timeout 功能针对 PHP 7.1+ 和 pcntl PHP 扩展进行了优化。

同样,作业可以运行的最大秒数可以使用 Artisan 命令行上的 --timeout 开关指定:

php
php artisan queue:work --timeout=30

但是,您也可以在作业类本身上定义作业应允许运行的最大秒数。如果在作业上指定了超时,它将优先于命令行上指定的任何超时:

php
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 作业在超时前可以运行的秒数。
     *
     * @var int
     */
    public $timeout = 120;
}

错误处理

如果在作业处理期间抛出异常,作业将自动释放回队列,以便可以再次尝试。作业将继续释放,直到达到应用程序允许的最大尝试次数。最大尝试次数由 queue:work Artisan 命令上使用的 --tries 开关定义。或者,最大尝试次数可以在作业类本身上定义。有关运行队列工作者的更多信息,请参见下文

运行队列工作者

Laravel 包含一个队列工作者,它将在作业推送到队列时处理新作业。您可以使用 queue:work Artisan 命令运行工作者。请注意,一旦 queue:work 命令启动,它将继续运行,直到手动停止或关闭终端:

php
php artisan queue:work
lightbulb

要使 queue:work 进程永久在后台运行,您应该使用进程监视器(如 Supervisor)来确保队列工作者不会停止运行。

请记住,队列工作者是长时间运行的进程,并将已启动的应用程序状态存储在内存中。因此,它们在启动后不会注意到代码库中的更改。因此,在部署过程中,请确保重新启动队列工作者

处理单个作业

--once 选项可用于指示工作者仅处理队列中的单个作业:

php
php artisan queue:work --once

指定连接和队列

您还可以指定工作者应使用哪个队列连接。传递给 work 命令的连接名称应对应于 config/queue.php 配置文件中定义的连接之一:

php
php artisan queue:work redis

您可以通过仅处理给定连接的特定队列来进一步自定义队列工作者。例如,如果您的所有电子邮件都在 redis 队列连接上的 emails 队列中处理,您可以发出以下命令以启动仅处理该队列的工作者:

php
php artisan queue:work redis --queue=emails

资源考虑

守护进程队列工作者在处理每个作业之前不会“重启”框架。因此,您应该在每个作业完成后释放任何重资源。例如,如果您使用 GD 库进行图像处理,完成后应使用 imagedestroy 释放内存。

队列优先级

有时您可能希望优先处理队列。例如,在 config/queue.php 中,您可以将 redis 连接的默认 queue 设置为 low。但是,有时您可能希望将作业推送到 high 优先级队列,如下所示:

php
dispatch((new Job)->onQueue('high'));

要启动一个工作者,确保所有 high 队列作业在继续处理 low 队列上的任何作业之前都已处理完毕,请将队列名称的逗号分隔列表传递给 work 命令:

php
php artisan queue:work --queue=high,low

队列工作者与部署

由于队列工作者是长时间运行的进程,它们不会在不重新启动的情况下拾取代码更改。因此,使用队列工作者部署应用程序的最简单方法是在部署过程中重新启动工作者。您可以通过发出 queue:restart 命令优雅地重新启动所有工作者:

php
php artisan queue:restart

此命令将指示所有队列工作者在完成当前作业后优雅地“死亡”,以确保没有现有作业丢失。由于在执行 queue:restart 命令时队列工作者将死亡,您应该运行一个进程管理器(如 Supervisor)以自动重新启动队列工作者。

lightbulb

队列使用 缓存 存储重启信号,因此在使用此功能之前,您应验证应用程序已正确配置缓存驱动程序。

作业过期与超时

作业过期

config/queue.php 配置文件中,每个队列连接定义了一个 retry_after 选项。此选项指定队列连接在重试正在处理的作业之前应等待的秒数。例如,如果 retry_after 的值设置为 90,则如果作业已处理 90 秒而未被删除,它将被释放回队列。通常,您应将 retry_after 值设置为作业合理完成处理所需的最大秒数。

exclamation

唯一不包含 retry_after 值的队列连接是 Amazon SQS。SQS 将根据 AWS 控制台中管理的默认可见性超时重试作业。

工作者超时

queue:work Artisan 命令公开了一个 --timeout 选项。--timeout 选项指定 Laravel 队列主进程在终止正在处理作业的子队列工作者之前将等待的时间。有时,子队列进程可能会因各种原因而“冻结”,例如外部 HTTP 调用未响应。--timeout 选项会删除超过指定时间限制的冻结进程:

php
php artisan queue:work --timeout=60

retry_after 配置选项和 --timeout CLI 选项不同,但它们协同工作以确保作业不会丢失,并且作业仅成功处理一次。

exclamation

--timeout 值应始终比 retry_after 配置值短几秒钟。这将确保在重试作业之前,处理给定作业的工作者始终被终止。如果您的 --timeout 选项长于 retry_after 配置值,您的作业可能会被处理两次。

工作者休眠时间

当队列上有作业可用时,工作者将继续处理作业,而不会在它们之间有任何延迟。然而,sleep 选项决定了如果没有新作业可用,工作者将“休眠”多长时间。在休眠期间,工作者不会处理任何新作业 - 作业将在工作者再次唤醒后处理。

php
php artisan queue:work --sleep=3

Supervisor 配置

安装 Supervisor

Supervisor 是 Linux 操作系统的进程监视器,如果 queue:work 进程失败,它将自动重新启动。要在 Ubuntu 上安装 Supervisor,您可以使用以下命令:

bash
sudo apt-get install supervisor
lightbulb

如果自己配置 Supervisor 听起来很复杂,可以考虑使用 Laravel Forge,它将自动为您的 Laravel 项目安装和配置 Supervisor。

配置 Supervisor

Supervisor 配置文件通常存储在 /etc/supervisor/conf.d 目录中。在此目录中,您可以创建任意数量的配置文件,指示 Supervisor 如何监视您的进程。例如,让我们创建一个 laravel-worker.conf 文件,启动并监视一个 queue:work 进程:

ini
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log

在此示例中,numprocs 指令将指示 Supervisor 运行 8 个 queue:work 进程并监视所有进程,如果它们失败,将自动重新启动它们。当然,您应该更改 command 指令中的 queue:work sqs 部分,以反映您所需的队列连接。

启动 Supervisor

创建配置文件后,您可以使用以下命令更新 Supervisor 配置并启动进程:

bash
sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start laravel-worker:*

有关 Supervisor 的更多信息,请查阅 Supervisor 文档

处理失败的作业

有时您的排队作业会失败。别担心,事情并不总是按计划进行!Laravel 提供了一种方便的方法来指定作业应尝试的最大次数。在作业超过此尝试次数后,它将被插入到 failed_jobs 数据库表中。要为 failed_jobs 表创建迁移,您可以使用 queue:failed-table 命令:

bash
php artisan queue:failed-table

php artisan migrate

然后,在运行您的队列工作者时,您应使用 queue:work 命令上的 --tries 开关指定作业应尝试的最大次数。如果您未为 --tries 选项指定值,作业将无限期尝试:

bash
php artisan queue:work redis --tries=3

清理失败的作业

您可以直接在作业类上定义一个 failed 方法,允许您在发生故障时执行作业特定的清理。这是向用户发送警报或撤销作业执行的任何操作的理想位置。导致作业失败的 Exception 将被传递给 failed 方法:

php
<?php

namespace App\Jobs;

use Exception;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;

class ProcessPodcast implements ShouldQueue
{
    use InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    /**
     * 创建一个新的作业实例。
     *
     * @param  Podcast  $podcast
     * @return void
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast;
    }

    /**
     * 执行作业。
     *
     * @param  AudioProcessor  $processor
     * @return void
     */
    public function handle(AudioProcessor $processor)
    {
        // 处理上传的播客...
    }

    /**
     * 作业处理失败。
     *
     * @param  Exception  $exception
     * @return void
     */
    public function failed(Exception $exception)
    {
        // 发送用户失败通知等...
    }
}

失败作业事件

如果您希望注册一个在作业失败时调用的事件,可以使用 Queue::failing 方法。此事件是通过电子邮件或 HipChat 通知您的团队的绝佳机会。例如,我们可以从 Laravel 附带的 AppServiceProvider 中附加一个回调到此事件:

php
<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Support\ServiceProvider;

class AppServiceProvider extends ServiceProvider
{
    /**
     * 启动任何应用程序服务。
     *
     * @return void
     */
    public function boot()
    {
        Queue::failing(function (JobFailed $event) {
            // $event->connectionName
            // $event->job
            // $event->exception
        });
    }

    /**
     * 注册服务提供者。
     *
     * @return void
     */
    public function register()
    {
        //
    }
}

重试失败的作业

要查看已插入 failed_jobs 数据库表中的所有失败作业,您可以使用 queue:failed Artisan 命令:

bash
php artisan queue:failed

queue:failed 命令将列出作业 ID、连接、队列和失败时间。作业 ID 可用于重试失败的作业。例如,要重试 ID 为 5 的失败作业,请发出以下命令:

bash
php artisan queue:retry 5

要重试所有失败的作业,请执行 queue:retry 命令并传递 all 作为 ID:

bash
php artisan queue:retry all

如果您希望删除失败的作业,可以使用 queue:forget 命令:

bash
php artisan queue:forget 5

要删除所有失败的作业,可以使用 queue:flush 命令:

bash
php artisan queue:flush

作业事件

使用 Queue facade 上的 beforeafter 方法,您可以指定在排队作业处理之前或之后执行的回调。这些回调是执行额外日志记录或为仪表板增加统计数据的绝佳机会。通常,您应该从 服务提供者 调用这些方法。例如,我们可以使用 Laravel 附带的 AppServiceProvider

php
<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class AppServiceProvider extends ServiceProvider
{
    /**
     * 启动任何应用程序服务。
     *
     * @return void
     */
    public function boot()
    {
        Queue::before(function (JobProcessing $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });

        Queue::after(function (JobProcessed $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });
    }

    /**
     * 注册服务提供者。
     *
     * @return void
     */
    public function register()
    {
        //
    }
}

使用 Queue facade 上的 looping 方法,您可以指定在工作者尝试从队列中获取作业之前执行的回调。例如,您可以注册一个闭包,以回滚任何由先前失败的作业留下的未完成事务:

php
Queue::looping(function () {
    while (DB::transactionLevel() > 0) {
        DB::rollBack();
    }
});