laravel framework – rabbitmq message queue (using laravel-queue-rabbitmq)

Reference documentation:

1. Composer installs laravel-queue-rabbitmq. When installing, pay attention to the laravel version corresponding to the application package.

composer require vladimir-yuldashev/laravel-queue-rabbitmq

2. In the config/app.php file, add in providers:


3. Add the following configuration to the connections array in the app/config/queue.php configuration file

 'rabbitmq' => [
            'driver' => 'rabbitmq',
            'dsn' => env('RABBITMQ_DSN', null),
             * Could be one a class that implements \Interop\Amqp\AmqpConnectionFactory for example:
             * - \EnqueueAmqpExt\AmqpConnectionFactory if you install enqueue/amqp-ext
             * - \EnqueueAmqpLib\AmqpConnectionFactory if you install enqueue/amqp-lib
             * - \EnqueueAmqpBunny\AmqpConnectionFactory if you install enqueue/amqp-bunny
            'factory_class' => Enqueue\AmqpLib\AmqpConnectionFactory::class,
            'host' => env('RABBITMQ_HOST', ''),
            'port' => env('RABBITMQ_PORT', 5672),
            'vhost' => env('RABBITMQ_VHOST', '/'),
            'login' => env('RABBITMQ_LOGIN', 'guest'),
            'password' => env('RABBITMQ_PASSWORD', 'guest'),
            'queue' => env('RABBITMQ_QUEUE', 'default'),
            'options' => [
                'exchange' => [
                    'name' => env('RABBITMQ_EXCHANGE_NAME'),
                     * Determine if exchange should be created if it does not exist.
                    'declare' => env('RABBITMQ_EXCHANGE_DECLARE', true),
                     * Read more about possible values at
                    // 'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_DIRECT),
                    'type' => env('RABBITMQ_EXCHANGE_TYPE', 'direct'),
                    'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false),
                    'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true),
                    'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
                    'arguments' => env('RABBITMQ_EXCHANGE_ARGUMENTS'),
                'queue' => [
                     * Determine if queue should be created if it does not exist.
                    'declare' => env('RABBITMQ_QUEUE_DECLARE', true),
                     * Determine if queue should be bound to the exchange created.
                    'bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true),
                     * Read more about possible values at
                    'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
                    'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
                    'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
                    'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false),
                    'arguments' => env('RABBITMQ_QUEUE_ARGUMENTS'),
             * Determine the number of seconds to sleep if there's an error communicating with rabbitmq
             * If set to false, it'll throw an exception rather than doing the sleep for X seconds.
            'sleep_on_error' => env('RABBITMQ_ERROR_SLEEP', 5),
             * Optional SSL params if an SSL connection is used
             * Using an SSL connection will also require to configure your RabbitMQ to enable SSL. More details can be founds here:
            'ssl_params' => [
                'ssl_on' => env('RABBITMQ_SSL', false),
                'cafile' => env('RABBITMQ_SSL_CAFILE', null),
                'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
                'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
                'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
                'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),

4. Modify the .env file

QUEUE_CONNECTION=rabbitmq #This configuration env will usually be found first and modified to this
RABBITMQ_HOST=localhost #localhost mq server address
RABBITMQ_PORT=5672 #mq port
RABBITMQ_LOGIN=danglei #mq's login name
RABBITMQ_PASSWORD=123456 #mq's password
RABBITMQ_QUEUE=test101 #mq queue name
QOS_PREFETCH_COUNT=100 #mq’s consumption quantity per time

5. Create task class

php artisan make:job Queue

After execution, a file app/Jobs/Queue.php will be generated.


namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;
class Queue implements ShouldQueue
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
    private $data;
    public function tags()
        return ['demo'];
     * Create a new job instance.
     * @return void
    public function __construct($data)
        $this->data = $data;
     * Execute the job.
     * @return void
    public function handle()
        // Process business...
        $this->data['exec_time'] = date('Y-m-d H:i:s');
        echo json_encode($this->data);

6. Production, put data into mq queue

 * Desc: test
 *User: dl
 * Time: 2021-01-07
namespace App\Http\Controllers\Queue;
use App\Http\Controllers\BaseWebController;
use Illuminate\Http\Request;
use App\Jobs\Queue;
use App\Jobs\Queue2;
use App\Libraries\Constants;
use Illuminate\Support\Facades\Log;
class Test extends BaseWebController
     * Desc: message queue
     *User: dl
     * Time: 2021-01-07
     * @param $params
     * @return array
    public function getApiIndex(Request $request){
        try {
             $params = $request->all();
             $params['request_time']=date('Y-m-d H:i:s');
             // for ($i=0; $i < 5; $i + + ) {
             // $this->dispatch(new Queue($params));
             // }
            $this->dispatch(new Queue($params));
            return ['code' => Constants::SUCCESS, 'msg' => 'success','data' => $params['id']];
        } catch (\Exception $e) {
            return ['code' => Constants::CURL_ERROR, 'msg' => $e->getMessage()];

7. Consumption queue
Execute the command to consume:

php artisan queue:work rabbitmq

The effect is as follows:

Note: Using the laravel-queue-rabbitmq package requires enabling sockets expansion, otherwise an error will be reported.

【Related Links】

(1) Installing RabbitMQ on CentOS7
(2) Laravel integrates rabbitmq message queue
(3) Horizon queue management tool
(4) Basic knowledge of RabbitMQ