Use PHP WorkerMan to build WebSocket full-duplex group chat communication (2)

A long time ago, before the WebSocket protocol was invented, when people made dynamic updates of real-time data on the Web, they generally used polling and long connections (Long Polling) to achieve it. Probably:

Polling: The client continuously sends HTTP requests to the server, and the server returns the latest data.
Long connection: The client sends an HTTP request to the server, and the server HOLDs the connection until there is new data and then returns.
Applications at that time included WebQQ, FaceBook IM, etc.

However, this implementation has a very big flaw. HTTP requests are half-duplex, and the client can only send requests to the server and return them. A large number of requests may cause problems such as CPU resource usage and memory overflow. So the WebSocket protocol was invented, similar to the HTTP protocol, with the address: ws:// (HTTP page) or wss:// (HTTPS page).

WebSocket is Full Duplex, which means that the server can also send data to the client. For example, when chatting, the client’s request can be omitted. When the other client has data submitted to the server, the server directly sends it to the current client.

The more well-known WebSocket frameworks include http://Socket.io (node.js), Workerman (PHP), Swoole (PHP), etc. (I have only tried the first two)

The group chat function of Pokers is implemented by polling, but my 1H1M1G small water pipe server cannot withstand the continuous increase in the number of users. I must try to use WebSocket to implement it.

<?php

//Introduce composer
require '../vendor/autoload.php';
require_once '../vendor/workerman/workerman/Autoloader.php';
require_once '../vendor/workerman/channel/src/Server.php'; //Workerman group sending
require_once '../vendor/workerman/channel/src/Client.php'; //Workerman group sending
define('LAZER_DATA_PATH', dirname(dirname(__FILE__)) . '/data/'); //json database used by Pokers

use Lazer\Classes\Database as Lazer;
use Workerman\Worker;
use Workerman\Lib\Timer;


$channel_server = new Channel\Server('0.0.0.0', 2206); //Group server address
$worker = new Worker('websocket://0.0.0.0:2000'); //WebSocket address
$worker->count = 2; //The number of Workerman processes
// Global group to connection mapping array
$group_con_map = array();

$worker->onWorkerStart = function ($worker) {<!-- -->
    //Channel client connects to Channel server
    Channel\Client::connect('0.0.0.0', 2206);

    //Listen to the global group sending message event
    Channel\Client::on('send', function ($event_data) {<!-- -->
        $thread = $event_data['thread_id'];
        $con_id = $event_data['con_id'];
        $mes_id = $event_data['mes_id'];
        $speaker = $event_data['speaker'];
        $class = $event_data['class_id'];

        $array = Lazer::table('messages')->limit(1)->where('id', '=', (int) $mes_id)->andWhere('speaker\ ', '=', (int) $speaker)->andWhere('belong_class', '=', (int) $class)->find()->asArray();

        if (!!$array[0]['speaker']) {<!-- -->
            global $group_con_map;
            if (isset($group_con_map[$thread])) {<!-- -->
                foreach ($group_con_map[$thread] as $con) {<!-- -->
                    $con->send(json_encode($array[0])); //Send data to each member of the group
                }
            }
        } else {<!-- -->
            $array = [
                'op' => 'sent',
                'status' => false,
                'code' => 108,
                'msg' => 'Illegal Request'
            ];
            global $group_con_map;
            $group_con_map[$thread][$con_id]->send(json_encode($array));
        }
    });

    //Heartbeat timing
    Timer::add(55, function () use ($worker) {<!-- -->
        foreach ($worker->connections as $connection) {<!-- -->
            $array = [
                'op' => 'keep'
            ];
            $connection->send(json_encode($array));
        }
    });
};

//Send a message
$worker->onMessage = function ($con, $data) {<!-- -->

    $data = json_decode($data, true);
    $cmd = $data['action'];
    $thread = $data['thread_id'];
    $class = $data['class_id'];
    $user = $data['speaker'];
    $user_name = $data['speaker_name'];
    @$mes_id = $data['mes_id'];

    if (!empty($user_name) & amp; & amp; !empty($thread) & amp; & amp; !empty($class) & amp; & amp; !empty($user)) {<!-- -->
        $array = Lazer::table('classes')->limit(1)->where('id', '=', (int) $class)->find()->asArray ();
        if (!!$array) {<!-- -->
            $array = Lazer::table('threads')->limit(1)->where('id', '=', (int) $thread)->andWhere('belong_class\ ', '=', (int) $class)->find()->asArray();
            if (!!$array) {<!-- -->
                $array = Lazer::table('users')->limit(1)->where('id', '=', (int) $user)->andWhere('name\ ', '=', (string) $user_name)->find()->asArray();
                if (!!$array & amp; & amp; in_array((string) $class, explode(',', $array[0]['class']))) {<!-- -- > //Determine if the user exists

                    switch ($cmd) {<!-- -->
                        case "join": //Client joins group
                            global $group_con_map;
                            //Add the connection to the corresponding group array
                            $group_con_map[$thread][$con->id] = $con;
                            $array = [
                                'op' => 'join',
                                'thread' => $thread,
                                'status' => true,
                                'code' => 100
                            ];
                            break;
                        case "send": //Client sends content
                            Channel\Client::publish('send', array(
                                'thread_id' => $thread,
                                'class_id' => $class,
                                'speaker' => $user,
                                'speaker_name' => $user_name,
                                'con_id' => $con->id,
                                'mes_id' => $mes_id
                            ));
                            $array = [
                                'op' => 'send',
                                'status' => true,
                                'code' => 105
                            ];
                            break;
                        default:
                            $array = [
                                'op' => 'send',
                                'status' => false,
                                'code' => 101,
                                'msg' => 'Illegal request'
                            ];
                            break;
                    }
                } else {<!-- -->
                    $array = [
                        'op' => 'send',
                        'status' => false,
                        'code' => 107,
                        'msg' => 'User does not exist or not in the class'
                    ];
                }
            } else {<!-- -->
                $array = [
                    'op' => 'send',
                    'status' => false,
                    'code' => 102,
                    'msg' => 'Thread does not exist'
                ];
            }
        } else {<!-- -->
            $array = [
                'op' => 'send',
                'status' => false,
                'code' => 103,
                'msg' => 'Class does not exist'
            ];
        }
    } else {<!-- -->
        $array = [
            'op' => 'send',
            'status' => false,
            'code' => 104,
            'msg' => 'Illegal request'
        ];
    }
    $con->send(json_encode($array));
};

// This is very important. When the connection is closed, delete the connection from the global group data to avoid memory leaks.
$worker->onClose = function ($con) {<!-- -->
    global $group_con_map;
    if (isset($con->group_id)) {<!-- -->
        unset($group_con_map[$con->group_id][$con->id]);
        if (empty($group_con_map[$con->group_id])) {<!-- -->
            unset($group_con_map[$con->group_id]);
        }
    }
};

$worker->onConnect = function ($con) {<!-- -->
    $array = [
        'op' => 'connect',
        'status' => true
    ];
    $con->send(json_encode($array));
};

Worker::runAll();

Front-end js

//websocket connection
        this.ws = new WebSocket('wss://pokers.zeo.im/wss');
        this.ws.onmessage = function (data) {<!-- -->
            var re = eval('(' + data.data + ')');
            switch (re.op) {<!-- -->
                case 'send':
                    if (!re.status) {<!-- -->
                        antd.$message.error('Service Unavailable');
                    }
                    break;
                case 'connect':
                    console.log('Connected to Pokers Server');
                    break;
                case 'join':
                    if (!re.status) {<!-- -->
                        antd.$message.error('Service Unavailable');
                    }
                    break;
                case 'keep':
                    break;
                default:
                    //Add a paragraph after the content paragraph
                    antd.opened_mes_info.meses.push(re);
                    if (parseInt(re.speaker) !== parseInt(antd.user.id)) {<!-- -->
                        if ($(window).height() + $('#mes-container').scrollTop() >= $('#mes-inner').height()) {<!-- - ->
                            //The visible area of the current window + the sliding distance is greater than the total sliding height. If there is an update, go directly to the bottom.
                            antd.bottom_mes();
                        } else {<!-- -->
                            antd.unread.visible = true;
                            setTimeout(function () {<!-- -->
                                antd.unread.visible = false;
                            }, 1000);
                        }
                    }
                    antd.update_mes();
                    break;
            }
        };

JavaScript connection code

//Broadcast all thread online users
this.ws.send('{"action":"send", "thread_id":' + antd.opened_mes_info.thread_id + ', "class_id":' + antd. opened_mes_info.class_id + ', "speaker":' + antd.user.id + ',"speaker_name":"' + antd.user.info.name + '",\ "mes_id":' + res.data.code + '}');

JavaScript send code

//Join the current Thread
this.ws.send('{"action":"join", "thread_id":' + id + ', "class_id":' + belong_class + ', \ "speaker":' + antd.user.id + ',"speaker_name":"' + antd.user.info.name + '"}');