laravel framework – rabbitmq message queue (using laravel-queue-rabbitmq)

Reference documentation: https://learnku.com/docs/laravel/8.x/queues/9398

https://packagist.org/packages/vladimir-yuldashev/laravel-queue-rabbitmq

1. Composer installs laravel-queue-rabbitmq. When installing, pay attention to the laravel version corresponding to the application package.

composer require vladimir-yuldashev/laravel-queue-rabbitmq

2. In the config/app.php file, add in providers:

VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServiceProvider::class,

3. Add the following configuration to the connections array in the app/config/queue.php configuration file

 'rabbitmq' => [
 
            'driver' => 'rabbitmq',
 
            'dsn' => env('RABBITMQ_DSN', null),
 
            /*
             * Could be one a class that implements \Interop\Amqp\AmqpConnectionFactory for example:
             * - \EnqueueAmqpExt\AmqpConnectionFactory if you install enqueue/amqp-ext
             * - \EnqueueAmqpLib\AmqpConnectionFactory if you install enqueue/amqp-lib
             * - \EnqueueAmqpBunny\AmqpConnectionFactory if you install enqueue/amqp-bunny
             */
 
            'factory_class' => Enqueue\AmqpLib\AmqpConnectionFactory::class,
 
            'host' => env('RABBITMQ_HOST', '127.0.0.1'),
            'port' => env('RABBITMQ_PORT', 5672),
 
            'vhost' => env('RABBITMQ_VHOST', '/'),
            'login' => env('RABBITMQ_LOGIN', 'guest'),
            'password' => env('RABBITMQ_PASSWORD', 'guest'),
 
            'queue' => env('RABBITMQ_QUEUE', 'default'),
 
            'options' => [
 
                'exchange' => [
 
                    'name' => env('RABBITMQ_EXCHANGE_NAME'),
 
                    /*
                     * Determine if exchange should be created if it does not exist.
                     */
 
                    'declare' => env('RABBITMQ_EXCHANGE_DECLARE', true),
 
                    /*
                     * Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html
                     */
 
                    // 'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_DIRECT),
                    'type' => env('RABBITMQ_EXCHANGE_TYPE', 'direct'),
                    'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false),
                    'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true),
                    'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
                    'arguments' => env('RABBITMQ_EXCHANGE_ARGUMENTS'),
                ],
 
                'queue' => [
 
                    /*
                     * Determine if queue should be created if it does not exist.
                     */
 
                    'declare' => env('RABBITMQ_QUEUE_DECLARE', true),
 
                    /*
                     * Determine if queue should be bound to the exchange created.
                     */
 
                    'bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true),
 
                    /*
                     * Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html
                     */
 
                    'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
                    'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
                    'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
                    'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false),
                    'arguments' => env('RABBITMQ_QUEUE_ARGUMENTS'),
                ],
            ],
 
            /*
             * Determine the number of seconds to sleep if there's an error communicating with rabbitmq
             * If set to false, it'll throw an exception rather than doing the sleep for X seconds.
             */
 
            'sleep_on_error' => env('RABBITMQ_ERROR_SLEEP', 5),
 
            /*
             * Optional SSL params if an SSL connection is used
             * Using an SSL connection will also require to configure your RabbitMQ to enable SSL. More details can be founds here: https://www.rabbitmq.com/ssl.html
             */
 
            'ssl_params' => [
                'ssl_on' => env('RABBITMQ_SSL', false),
                'cafile' => env('RABBITMQ_SSL_CAFILE', null),
                'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
                'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
                'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
                'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
            ],
 
        ],

4. Modify the .env file

#rabbitmq
QUEUE_CONNECTION=rabbitmq #This configuration env will usually be found first and modified to this
 
RABBITMQ_HOST=localhost #localhost mq server address
RABBITMQ_PORT=5672 #mq port
RABBITMQ_VHOST=/
RABBITMQ_LOGIN=danglei #mq's login name
RABBITMQ_PASSWORD=123456 #mq's password
RABBITMQ_QUEUE=test101 #mq queue name
QOS_PREFETCH_COUNT=100 #mq’s consumption quantity per time
#RABBITMQ_WORKER=horizon

5. Create task class

php artisan make:job Queue

After execution, a file app/Jobs/Queue.php will be generated.

example:

<?php
 
namespace App\Jobs;
 
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;
 
class Queue implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
 
    private $data;
 
    public function tags()
    {
        return ['demo'];
    }
    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct($data)
    {
        $this->data = $data;
    }
 
    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        // Process business...
        sleep(1);
        $this->data['exec_time'] = date('Y-m-d H:i:s');
 
        Log::info($this->data);
        echo json_encode($this->data);
    }
}
 
?>

6. Production, put data into mq queue

<?php
/*
 * Desc: test
 *User: dl
 * Time: 2021-01-07
 */
 
namespace App\Http\Controllers\Queue;
 
use App\Http\Controllers\BaseWebController;
use Illuminate\Http\Request;
use App\Jobs\Queue;
use App\Jobs\Queue2;
use App\Libraries\Constants;
use Illuminate\Support\Facades\Log;
 
class Test extends BaseWebController
{
    /**
     * Desc: message queue
     *User: dl
     * Time: 2021-01-07
     * @param $params
     * @return array
     */
    public function getApiIndex(Request $request){
        try {
             $params = $request->all();
             $params['id']=rand(1,999);
             $params['mq']='Queue';
             $params['request_time']=date('Y-m-d H:i:s');
             // for ($i=0; $i < 5; $i + + ) {
             // $this->dispatch(new Queue($params));
             // }
            $this->dispatch(new Queue($params));
 
            Log::info("\
\
\
".'-----'.$params['id']);
            
            return ['code' => Constants::SUCCESS, 'msg' => 'success','data' => $params['id']];
 
        } catch (\Exception $e) {
            Log::debug($e->getMessage());
            return ['code' => Constants::CURL_ERROR, 'msg' => $e->getMessage()];
        }
    }
}
 
?>

7. Consumption queue
Execute the command to consume:

php artisan queue:work rabbitmq

The effect is as follows:

Note: Using the laravel-queue-rabbitmq package requires enabling sockets expansion, otherwise an error will be reported.

【Related Links】

(1) Installing RabbitMQ on CentOS7 https://blog.csdn.net/weixin_37689230/article/details/112276503
(2) Laravel integrates rabbitmq message queue https://blog.csdn.net/weixin_37689230/article/details/112321216
(3) Horizon queue management tool https://blog.csdn.net/weixin_37689230/article/details/112366571
(4) Basic knowledge of RabbitMQ https://blog.csdn.net/weixin_37689230/article/details/112542844