<?php
declare(strict_types = 1);
namespace App\Components\Amqp;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Builder;
use Hyperf\Amqp\Message\ProducerMessageInterface;
use Hyperf\Di\Annotation\AnnotationCollector;
use PhpAmqpLib\Message\AMQPMessage;
use Throwable;
class DelayProducer extends Builder
{
public function produce(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
{
return retry(1, function () use ($producerMessage, $queueBuilder, $confirm, $timeout)
{
return $this->produceMessage($producerMessage, $queueBuilder, $confirm, $timeout);
});
}
private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
{
$result = false;
$this->injectMessageProperty($producerMessage);
$message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());
$pool = $this->getConnectionPool($producerMessage->getPoolName());
$connection = $pool->get();
if ($confirm) {
$channel = $connection->getConfirmChannel();
} else {
$channel = $connection->getChannel();
}
$channel->set_ack_handler(function () use (&$result)
{
$result = true;
});
try {
$exchangeBuilder = $producerMessage->getExchangeBuilder();
$channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket());
$channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isAutoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket());
$channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey());
$channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());
$channel->wait_for_pending_acks_returns($timeout);
} catch (Throwable $exception) {
$connection->reconnect();
throw $exception;
}
finally {
$connection->release();
}
return $confirm ? $result : true;
}
private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void
{
if (class_exists(AnnotationCollector::class)) {
$annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);
if ($annotation) {
$annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey);
$annotation->exchange && $producerMessage->setExchange($annotation->exchange);
}
}
}
}