PHP 框架 Hyperf 实现处理超时未支付订单和延时队列

来源:凉官灰 发布时间:2020-05-18 10:38:14 阅读量:1618

延时队列

  • Delayproducer.Php

  • Amqpbuilder.Php

AmqpBuilder.php

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

<?php

declare(strict_types = 1);

namespace App\Components\Amqp;

use Hyperf\Amqp\Builder\Builder;

use Hyperf\Amqp\Builder\QueueBuilder;

class AmqpBuilder extends QueueBuilder

{

    /**

     * @param array|\PhpAmqpLib\Wire\AMQPTable $arguments

     *

     * @return \Hyperf\Amqp\Builder\Builder

     */

    public function setArguments($arguments) : Builder

    {

        $this->arguments = array_merge($this->arguments, $arguments);

        return $this;

    }

    /**

     * 设置延时队列相关参数

     *

     * @param string $queueName

     * @param int    $xMessageTtl

     * @param string $xDeadLetterExchange

     * @param string $xDeadLetterRoutingKey

     *

     * @return $this

     */

    public function setDelayedQueue(string $queueName, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self

    {

        $this->setArguments([

            'x-message-ttl'             => ['I', $xMessageTtl * 1000], // 毫秒

            'x-dead-letter-exchange'    => ['S', $xDeadLetterExchange],

            'x-dead-letter-routing-key' => ['S', $xDeadLetterRoutingKey],

        ]);

        $this->setQueue($queueName);

        return $this;

    }

}

DelayProducer.php

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

<?php

declare(strict_types = 1);

namespace App\Components\Amqp;

use Hyperf\Amqp\Annotation\Producer;

use Hyperf\Amqp\Builder;

use Hyperf\Amqp\Message\ProducerMessageInterface;

use Hyperf\Di\Annotation\AnnotationCollector;

use PhpAmqpLib\Message\AMQPMessage;

use Throwable;

class DelayProducer extends Builder

{

    /**

     * @param ProducerMessageInterface $producerMessage

     * @param AmqpBuilder              $queueBuilder

     * @param bool                     $confirm

     * @param int                      $timeout

     *

     * @return bool

     * @throws \Throwable

     */

    public function produce(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool

    {

        return retry(1, function () use ($producerMessage, $queueBuilder, $confirm, $timeout)

        {

            return $this->produceMessage($producerMessage, $queueBuilder, $confirm, $timeout);

        });

    }

    /**

     * @param ProducerMessageInterface $producerMessage

     * @param AmqpBuilder              $queueBuilder

     * @param bool                     $confirm

     * @param int                      $timeout

     *

     * @return bool

     * @throws \Throwable

     */

    private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool

    {

        $result = false;

        $this->injectMessageProperty($producerMessage);

        $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());

        $pool    = $this->getConnectionPool($producerMessage->getPoolName());

        /** @var \Hyperf\Amqp\Connection $connection */

        $connection = $pool->get();

        if ($confirm) {

            $channel = $connection->getConfirmChannel();

        } else {

            $channel = $connection->getChannel();

        }

        $channel->set_ack_handler(function () use (&$result)

        {

            $result = true;

        });

        try {

            // 处理延时队列

            $exchangeBuilder = $producerMessage->getExchangeBuilder();

            // 队列定义

            $channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket());

            // 路由定义

            $channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isAutoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket());

            // 队列绑定

            $channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey());

            // 消息发送

            $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());

            $channel->wait_for_pending_acks_returns($timeout);

        } catch (Throwable $exception) {

            // Reconnect the connection before release.

            $connection->reconnect();

            throw $exception;

        }

        finally {

            $connection->release();

        }

        return $confirm ? $result : true;

    }

    /**

     * @param ProducerMessageInterface $producerMessage

     */

    private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void

    {

        if (class_exists(AnnotationCollector::class)) {

            /** @var \Hyperf\Amqp\Annotation\Producer $annotation */

            $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);

            if ($annotation) {

                $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey);

                $annotation->exchange && $producerMessage->setExchange($annotation->exchange);

            }

        }

    }

}

处理超时订单

  • Orderqueueconsumer.Php

  • Orderqueueproducer.Php

Orderqueueproducer.php

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

<?php

declare(strict_types = 1);

namespace App\Amqp\Producer;

use Hyperf\Amqp\Annotation\Producer;

use Hyperf\Amqp\Builder\ExchangeBuilder;

use Hyperf\Amqp\Message\ProducerMessage;

/**

 * @Producer(exchange="order_exchange", routingKey="order_exchange")

 */

class OrderQueueProducer extends ProducerMessage

{

    public function __construct($data)

    {

        $this->payload = $data;

    }

    public function getExchangeBuilder() : ExchangeBuilder

    {

        return parent::getExchangeBuilder(); // TODO: Change the autogenerated stub

    }

}

Orderqueueconsumer.php

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

<?php

declare(strict_types = 1);

namespace App\Amqp\Consumer;

use App\Service\CityTransport\OrderService;

use Hyperf\Amqp\Result;

use Hyperf\Amqp\Annotation\Consumer;

use Hyperf\Amqp\Message\ConsumerMessage;

/**

 * @Consumer(exchange="delay_exchange", routingKey="delay_route", queue="delay_queue", name ="OrderQueueConsumer", nums=1)

 */

class OrderQueueConsumer extends ConsumerMessage

{

    public function consume($data) : string

    {

       ##业务处理

    }

    public function isEnable() : bool

    {

        return true;

    }

}

Demo

1

2

3

4

$builder = new AmqpBuilder();

        $builder->setDelayedQueue('order_exchange', 1, 'delay_exchange', 'delay_route');

        $que = ApplicationContext::getContainer()->get(DelayProducer::class);

        var_dump($que->produce(new OrderQueueProducer(['order_sn' => (string)mt_rand(10000, 90000)]), $builder))


标签: PHP 环境搭建
分享:
评论:
你还没有登录,请先