1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26: 27: 28: 29: 30: 31: 32: 33: 34: 35: 36: 37: 38: 39: 40: 41: 42: 43: 44: 45: 46: 47: 48: 49: 50: 51: 52: 53: 54: 55: 56:
<?php
namespace SAREhub\Client\Amqp;
use PhpAmqpLib\Message\AMQPMessage;
use SAREhub\Client\Amqp\AmqpMessageHeaders as AMH;
use SAREhub\Client\Message\BasicMessage;
use SAREhub\Client\Message\Message;
class AmqpMessageConverter
{
public function convertFrom(AMQPMessage $message)
{
return BasicMessage::newInstance()
->setBody($message->getBody())
->setHeaders([
AMH::CONSUMER_TAG => $message->get('consumer_tag'),
AMH::DELIVERY_TAG => $message->get('delivery_tag'),
AMH::REDELIVERED => $message->get('redelivered'),
AMH::EXCHANGE => $message->get('exchange'),
AMH::ROUTING_KEY => $message->get('routing_key'),
AMH::CONTENT_TYPE => $this->extractProperty('content_type', $message),
AMH::CONTENT_ENCODING => $this->extractProperty('content_encoding', $message),
AMH::REPLY_TO => $this->extractProperty('reply_to', $message),
AMH::CORRELATION_ID => $this->extractProperty('correlation_id', $message),
AMH::PRIORITY => $this->extractProperty('priority', $message)
]);
}
private function extractProperty($name, AMQPMessage $message)
{
return $message->has($name) ? $message->get($name) : null;
}
public function convertTo(Message $message)
{
$properties = [];
foreach (AmqpMessageHeaders::getPropertyHeaders() as $header) {
if ($message->hasHeader($header)) {
$properties[AmqpMessageHeaders::getMessagePropertyName($header)] = $message->getHeader($header);
}
}
return new AMQPMessage($message->getBody(), $properties);
}
}