如何基于Hyperf实现RabbitMQ+WebSocket消息推送

来源:藏色散人 发布时间:2020-05-07 10:31:47 阅读量:1098

介绍

基于 Hyperf+ WebSocket +RabbitMQ 实现的一个简单大屏幕的消息推送。

思路

利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,

保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。

WebSocket 服务

1

composer require hyperf/websocket-server

配置文件 [config/autoload/server.php]

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

<?php

return [

    'mode' => SWOOLE_PROCESS,

    'servers' => [

        [

            'name' => 'http',

            'type' => Server::SERVER_HTTP,

            'host' => '0.0.0.0',

            'port' => 11111,

            'sock_type' => SWOOLE_SOCK_TCP,

            'callbacks' => [

                SwooleEvent::ON_REQUEST => [Hyperf\HttpServer\Server::class, 'onRequest'],

            ],

        ],

        [

            'name' => 'ws',

            'type' => Server::SERVER_WEBSOCKET,

            'host' => '0.0.0.0',

            'port' => 12222,

            'sock_type' => SWOOLE_SOCK_TCP,

            'callbacks' => [

                SwooleEvent::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'],

                SwooleEvent::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'],

                SwooleEvent::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose'],

            ],

        ],

    ],

WebSocket 服务器端代码示例

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

<?php

declare(strict_types=1);

/**

 * This file is part of Hyperf.

 *

 * @link     https://www.hyperf.io

 * @document https://doc.hyperf.io

 * @contact  group@hyperf.io

 * @license  https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE

 */

namespace App\Controller;

use Hyperf\Contract\OnCloseInterface;

use Hyperf\Contract\OnMessageInterface;

use Hyperf\Contract\OnOpenInterface;

use Swoole\Http\Request;

use Swoole\Server;

use Swoole\Websocket\Frame;

use Swoole\WebSocket\Server as WebSocketServer;

class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface

{

    /**

     * 发送消息

     * @param WebSocketServer $server

     * @param Frame $frame

     */

    public function onMessage(WebSocketServer $server, Frame $frame): void

    {

        //心跳刷新缓存

        $redis = $this->container->get(\Redis::class);

        //获取所有的客户端id

        $fdList = $redis->sMembers('websocket_sjd_1');

        //如果当前客户端在客户端集合中,就刷新

        if (in_array($frame->fd, $fdList)) {

            $redis->sAdd('websocket_sjd_1', $frame->fd);

            $redis->expire('websocket_sjd_1', 7200);

        }

        $server->push($frame->fd, 'Recv: ' . $frame->data);

    }

    /**

     * 客户端失去链接

     * @param Server $server

     * @param int $fd

     * @param int $reactorId

     */

    public function onClose(Server $server, int $fd, int $reactorId): void

    {

        //删掉客户端id

        $redis = $this->container->get(\Redis::class);

        //移除集合中指定的value

        $redis->sRem('websocket_sjd_1', $fd);

        var_dump('closed');

    }

    /**

     * 客户端链接

     * @param WebSocketServer $server

     * @param Request $request

     */

    public function onOpen(WebSocketServer $server, Request $request): void

    {

        //保存客户端id

        $redis = $this->container->get(\Redis::class);

        $res1 = $redis->sAdd('websocket_sjd_1', $request->fd);

        var_dump($res1);

        $res = $redis->expire('websocket_sjd_1', 7200);

        var_dump($res);

        $server->push($request->fd, 'Opened');

    }

}

WebSocket 前端代码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

function WebSocketTest() {

    if ("WebSocket" in window) {

        console.log("您的浏览器支持 WebSocket!");

        var num = 0

        // 打开一个 web socket

        var ws = new WebSocket("ws://127.0.0.1:12222");

        ws.onopen = function () {

            // Web Socket 已连接上,使用 send() 方法发送数据

            //alert("数据发送中...");

            //ws.send("发送数据");

        };

        window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开

            var ping = {"type": "ping"};

            ws.send(JSON.stringify(ping));

        }, 5000);

        ws.onmessage = function (evt) {

            var d = JSON.parse(evt.data);

            console.log(d);

            if (d.code == 300) {

                $(".address").text(d.address)

            }

            if (d.code == 200) {

                var v = d.data

                console.log(v);

                num++

                var str = `<div class="item">

                                <p>${v.recordOutTime}</p>

                                <p>${v.userOutName}</p>

                                <p>${v.userOutNum}</p>

                                <p>${v.doorOutName}</p>

                            </div>`

                $(".tableHead").after(str)

                if (num > 7) {

                    num--

                    $(".table .item:nth-last-child(1)").remove()

                }

            }

        };

        ws.error = function (e) {

            console.log(e)

            alert(e)

        }

        ws.onclose = function () {

            // 关闭 websocket

            alert("连接已关闭...");

        };

    } else {

        alert("您的浏览器不支持 WebSocket!");

    }

}

AMQP 组件

1

composer require hyperf/amqp

配置文件 [config/autoload/amqp.php]

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

<?php

return [

    'default' => [

        'host' => 'localhost',

        'port' => 5672,

        'user' => 'guest',

        'password' => 'guest',

        'vhost' => '/',

        'pool' => [

            'min_connections' => 1,

            'max_connections' => 10,

            'connect_timeout' => 10.0,

            'wait_timeout' => 3.0,

            'heartbeat' => -1,

        ],

        'params' => [

            'insist' => false,

            'login_method' => 'AMQPLAIN',

            'login_response' => null,

            'locale' => 'en_US',

            'connection_timeout' => 3.0,

            'read_write_timeout' => 6.0,

            'context' => null,

            'keepalive' => false,

            'heartbeat' => 3,

        ],

    ],

];

MQ 消费者代码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

<?php

declare(strict_types=1);

namespace App\Amqp\Consumer;

use Hyperf\Amqp\Annotation\Consumer;

use Hyperf\Amqp\Message\ConsumerMessage;

use Hyperf\Amqp\Result;

use Hyperf\Server\Server;

use Hyperf\Server\ServerFactory;

/**

 * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)

 */

class DemoConsumer extends ConsumerMessage

{

    /**

     * rabbmitMQ消费端代码

     * @param $data

     * @return string

     */

    public function consume($data): string

    {

        print_r($data);

        //获取集合中所有的value

        $redis = $this->container->get(\Redis::class);

        $fdList=$redis->sMembers('websocket_sjd_1');

        $server=$this->container->get(ServerFactory::class)->getServer()->getServer();

        foreach($fdList as $key=>$v){

            if(!empty($v)){

                $server->push((int)$v, $data);

            }

        }

        return Result::ACK;

    }

}

控制器代码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

/**

 * test

 * @return array

 */

public function test()

{

    $data = array(

        'code' => 200,

        'data' => [

            'userOutName' => 'ccflow',

            'userOutNum' => '9999',

            'recordOutTime' => date("Y-m-d H:i:s", time()),

            'doorOutName' => '教师公寓',

        ]

    );

    $data = \GuzzleHttp\json_encode($data);

    $message = new DemoProducer($data);

    $producer = ApplicationContext::getContainer()->get(Producer::class);

    $result = $producer->produce($message);

    var_dump($result);

    $user = $this->request->input('user', 'Hyperf');

    $method = $this->request->getMethod();

    return [

        'method' => $method,

        'message' => "{$user}.",

    ];

}


标签: PHP
分享:
评论:
你还没有登录,请先