1:   2:   3:   4:   5:   6:   7:   8:   9:  10:  11:  12:  13:  14:  15:  16:  17:  18:  19:  20:  21:  22:  23:  24:  25:  26:  27:  28:  29:  30:  31:  32:  33:  34:  35:  36:  37:  38:  39:  40:  41:  42:  43:  44:  45:  46:  47:  48:  49:  50:  51:  52:  53:  54:  55:  56:  57:  58:  59:  60:  61:  62:  63:  64:  65:  66:  67:  68:  69:  70:  71:  72:  73:  74:  75:  76:  77:  78:  79:  80:  81:  82:  83:  84:  85:  86:  87:  88:  89:  90:  91:  92:  93:  94:  95:  96:  97:  98:  99: 100: 101: 102: 103: 104: 105: 106: 107: 108: 109: 110: 111: 112: 113: 114: 115: 116: 117: 118: 119: 120: 121: 122: 123: 124: 125: 126: 127: 128: 129: 130: 131: 132: 133: 134: 135: 136: 137: 138: 139: 140: 141: 142: 143: 144: 145: 146: 147: 148: 149: 150: 151: 152: 153: 154: 155: 156: 157: 158: 159: 160: 161: 162: 163: 164: 165: 166: 167: 168: 169: 170: 171: 172: 173: 174: 175: 176: 177: 178: 179: 180: 181: 182: 183: 184: 
<?php

namespace SAREhub\Client\Amqp;

use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Message\AMQPMessage;
use SAREhub\Client\Message\BasicExchange;
use SAREhub\Client\Message\Message;
use SAREhub\Commons\Service\ServiceSupport;

class AmqpChannelWrapper extends ServiceSupport
{
    const DEFAULT_WAIT_TIMEOUT = 1;

    /**
     * @var AMQPChannel
     */
    private $channel;

    /**
     * @var AmqpMessageConverter
     */
    private $messageConverter;

    /**
     * @var AmqpProcessConfirmStrategy
     */
    private $processConfirmStrategy;

    /**
     * @var int
     */
    private $waitTimeout = self::DEFAULT_WAIT_TIMEOUT;

    /**
     * @var AmqpConsumer[]
     */
    private $consumers = [];

    public function __construct(AMQPChannel $channel)
    {
        $this->channel = $channel;
        $this->messageConverter = new AmqpMessageConverter();
        $this->processConfirmStrategy = new BasicAmqpProcessConfirmStrategy();
    }

    public function registerConsumer(AmqpConsumer $consumer)
    {
        $opts = $consumer->getOptions();
        $this->getLogger()->debug("registering consumer", ["options" => $opts]);
        $tag = $this->getWrappedChannel()->basic_consume(
            $opts->getQueueName(),
            $opts->getTag(),
            false,
            $opts->isAutoAckMode(),
            $opts->isExclusive(),
            false,
            [$this, "onMessage"],
            null,
            $opts->getConsumeArguments()
        );
        $opts->setTag($tag);

        $this->getLogger()->debug("consumer tag: $tag", ["options" => $opts]);
        $this->consumers[$tag] = $consumer;
    }

    public function unregisterConsumer(string $consumerTag)
    {
        $consumer = $this->getConsumer($consumerTag);
        $this->getWrappedChannel()->basic_cancel($consumer->getOptions()->getTag(), false, true);
        unset($this->consumers[$consumerTag]);
    }

    public function onMessage(AMQPMessage $in)
    {
        $inConverted = $this->getMessageConverter()->convertFrom($in);
        $this->getLogger()->debug('onMessage', ['message' => $inConverted]);

        $consumerTag = $inConverted->getHeader(AmqpMessageHeaders::CONSUMER_TAG);
        $exchange = BasicExchange::withIn($inConverted);
        $consumer = $this->getConsumer($consumerTag);
        $consumer->process($exchange);
        $this->getProcessConfirmStrategy()->confirm($this, $inConverted, $exchange);
    }

    protected function doTick()
    {
        try {
            $this->getWrappedChannel()->wait(null, true, $this->getWaitTimeout());
        } catch (AMQPTimeoutException $e) {
            $this->getLogger()->debug("channel wait timeout: " . $e->getMessage(), ["exception" => $e]);
        }
    }

    public function ack(Message $message)
    {
        $this->getLogger()->debug("ack", ["message" => $message]);
        $deliveryTag = $message->getHeader(AmqpMessageHeaders::DELIVERY_TAG);
        $this->getWrappedChannel()->basic_ack($deliveryTag, false);
    }

    public function reject(Message $message, bool $requeue = true)
    {
        $this->getLogger()->debug("reject", ["message" => $message, "requeue" => $requeue]);
        $deliveryTag = $message->getHeader(AmqpMessageHeaders::DELIVERY_TAG);
        $this->getWrappedChannel()->basic_reject($deliveryTag, $requeue);
    }

    public function publish(Message $message)
    {
        $this->getLogger()->debug("publish", ["message" => $message]);

        $converted = $this->messageConverter->convertTo($message);
        $exchange = $message->getHeader(AmqpMessageHeaders::EXCHANGE);
        $routingKey = $message->getHeader(AmqpMessageHeaders::ROUTING_KEY);
        $this->getWrappedChannel()->basic_publish($converted, $exchange, $routingKey);
    }

    public function setChannelPrefetchCount(int $count, int $size = 0)
    {
        $this->getWrappedChannel()->basic_qos($size, $count, true);
    }

    public function setPrefetchCountPerConsumer(int $count, int $size = 0)
    {
        $this->getWrappedChannel()->basic_qos($size, $count, false);
    }

    public function getWaitTimeout(): int
    {
        return $this->waitTimeout;
    }

    public function setWaitTimeout(int $timeout)
    {
        $this->waitTimeout = $timeout;
    }

    public function getConsumer(string $consumerTag): AmqpConsumer
    {
        if ($this->hasConsumer($consumerTag)) {
            return $this->consumers[$consumerTag];
        }

        throw new \InvalidArgumentException(sprintf("consumer with tag: '%s' is not registered", $consumerTag));
    }

    public function hasConsumer(string $consumerTag)
    {
        return isset($this->consumers[$consumerTag]);
    }

    public function getConsumers(): array
    {
        return $this->consumers;
    }

    public function getWrappedChannel(): AMQPChannel
    {
        return $this->channel;
    }

    public function getMessageConverter(): AmqpMessageConverter
    {
        return $this->messageConverter;
    }

    public function setMessageConverter(AmqpMessageConverter $messageConverter)
    {
        $this->messageConverter = $messageConverter;
    }

    public function getProcessConfirmStrategy(): AmqpProcessConfirmStrategy
    {
        return $this->processConfirmStrategy;
    }

    public function setProcessConfirmStrategy(AmqpProcessConfirmStrategy $processConfirmStrategy)
    {
        $this->processConfirmStrategy = $processConfirmStrategy;
    }
}