1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26: 27: 28: 29: 30: 31: 32: 33: 34: 35: 36: 37: 38: 39: 40: 41: 42: 43: 44: 45: 46: 47: 48: 49: 50: 51: 52: 53: 54: 55: 56: 57: 58: 59: 60: 61: 62: 63: 64: 65: 66: 67: 68: 69: 70: 71: 72: 73: 74: 75: 76: 77: 78: 79: 80: 81: 82: 83: 84: 85: 86: 87: 88: 89: 90: 91: 92: 93: 94: 95: 96: 97: 98: 99: 100: 101: 102: 103: 104: 105: 106: 107: 108: 109: 110: 111: 112: 113: 114: 115: 116: 117: 118: 119: 120: 121: 122: 123: 124: 125: 126: 127: 128: 129: 130: 131: 132: 133: 134: 135: 136: 137: 138: 139: 140: 141: 142: 143: 144: 145: 146: 147: 148: 149: 150: 151: 152: 153: 154: 155: 156: 157: 158: 159: 160: 161: 162: 163: 164: 165: 166: 167: 168: 169: 170: 171: 172: 173: 174: 175: 176: 177: 178: 179: 180: 181: 182: 183: 184:
<?php
namespace SAREhub\Client\Amqp;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Message\AMQPMessage;
use SAREhub\Client\Message\BasicExchange;
use SAREhub\Client\Message\Message;
use SAREhub\Commons\Service\ServiceSupport;
class AmqpChannelWrapper extends ServiceSupport
{
const DEFAULT_WAIT_TIMEOUT = 1;
private $channel;
private $messageConverter;
private $processConfirmStrategy;
private $waitTimeout = self::DEFAULT_WAIT_TIMEOUT;
private $consumers = [];
public function __construct(AMQPChannel $channel)
{
$this->channel = $channel;
$this->messageConverter = new AmqpMessageConverter();
$this->processConfirmStrategy = new BasicAmqpProcessConfirmStrategy();
}
public function registerConsumer(AmqpConsumer $consumer)
{
$opts = $consumer->getOptions();
$this->getLogger()->debug("registering consumer", ["options" => $opts]);
$tag = $this->getWrappedChannel()->basic_consume(
$opts->getQueueName(),
$opts->getTag(),
false,
$opts->isAutoAckMode(),
$opts->isExclusive(),
false,
[$this, "onMessage"],
null,
$opts->getConsumeArguments()
);
$opts->setTag($tag);
$this->getLogger()->debug("consumer tag: $tag", ["options" => $opts]);
$this->consumers[$tag] = $consumer;
}
public function unregisterConsumer(string $consumerTag)
{
$consumer = $this->getConsumer($consumerTag);
$this->getWrappedChannel()->basic_cancel($consumer->getOptions()->getTag(), false, true);
unset($this->consumers[$consumerTag]);
}
public function onMessage(AMQPMessage $in)
{
$inConverted = $this->getMessageConverter()->convertFrom($in);
$this->getLogger()->debug('onMessage', ['message' => $inConverted]);
$consumerTag = $inConverted->getHeader(AmqpMessageHeaders::CONSUMER_TAG);
$exchange = BasicExchange::withIn($inConverted);
$consumer = $this->getConsumer($consumerTag);
$consumer->process($exchange);
$this->getProcessConfirmStrategy()->confirm($this, $inConverted, $exchange);
}
protected function doTick()
{
try {
$this->getWrappedChannel()->wait(null, true, $this->getWaitTimeout());
} catch (AMQPTimeoutException $e) {
$this->getLogger()->debug("channel wait timeout: " . $e->getMessage(), ["exception" => $e]);
}
}
public function ack(Message $message)
{
$this->getLogger()->debug("ack", ["message" => $message]);
$deliveryTag = $message->getHeader(AmqpMessageHeaders::DELIVERY_TAG);
$this->getWrappedChannel()->basic_ack($deliveryTag, false);
}
public function reject(Message $message, bool $requeue = true)
{
$this->getLogger()->debug("reject", ["message" => $message, "requeue" => $requeue]);
$deliveryTag = $message->getHeader(AmqpMessageHeaders::DELIVERY_TAG);
$this->getWrappedChannel()->basic_reject($deliveryTag, $requeue);
}
public function publish(Message $message)
{
$this->getLogger()->debug("publish", ["message" => $message]);
$converted = $this->messageConverter->convertTo($message);
$exchange = $message->getHeader(AmqpMessageHeaders::EXCHANGE);
$routingKey = $message->getHeader(AmqpMessageHeaders::ROUTING_KEY);
$this->getWrappedChannel()->basic_publish($converted, $exchange, $routingKey);
}
public function setChannelPrefetchCount(int $count, int $size = 0)
{
$this->getWrappedChannel()->basic_qos($size, $count, true);
}
public function setPrefetchCountPerConsumer(int $count, int $size = 0)
{
$this->getWrappedChannel()->basic_qos($size, $count, false);
}
public function getWaitTimeout(): int
{
return $this->waitTimeout;
}
public function setWaitTimeout(int $timeout)
{
$this->waitTimeout = $timeout;
}
public function getConsumer(string $consumerTag): AmqpConsumer
{
if ($this->hasConsumer($consumerTag)) {
return $this->consumers[$consumerTag];
}
throw new \InvalidArgumentException(sprintf("consumer with tag: '%s' is not registered", $consumerTag));
}
public function hasConsumer(string $consumerTag)
{
return isset($this->consumers[$consumerTag]);
}
public function getConsumers(): array
{
return $this->consumers;
}
public function getWrappedChannel(): AMQPChannel
{
return $this->channel;
}
public function getMessageConverter(): AmqpMessageConverter
{
return $this->messageConverter;
}
public function setMessageConverter(AmqpMessageConverter $messageConverter)
{
$this->messageConverter = $messageConverter;
}
public function getProcessConfirmStrategy(): AmqpProcessConfirmStrategy
{
return $this->processConfirmStrategy;
}
public function setProcessConfirmStrategy(AmqpProcessConfirmStrategy $processConfirmStrategy)
{
$this->processConfirmStrategy = $processConfirmStrategy;
}
}