-
Notifications
You must be signed in to change notification settings - Fork 3
/
Driver.php
107 lines (83 loc) · 2.64 KB
/
Driver.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
<?php
declare(strict_types=1);
namespace Bernard\Driver\QueueInterop;
use Bernard\Driver\Message;
use Interop\Amqp\AmqpContext;
use Interop\Amqp\AmqpQueue;
use Interop\Queue\Consumer;
use Interop\Queue\Context;
final class Driver implements \Bernard\Driver
{
/**
* @var Consumer[]
*/
private array $consumers = [];
public function __construct(private Context $context)
{
}
public function listQueues(): array
{
return [];
}
public function createQueue(string $queueName): void
{
if ($this->context instanceof AmqpContext) {
$this->context->declareQueue($this->createAmqpQueue($queueName));
}
}
public function removeQueue(string $queueName): void
{
if ($this->context instanceof AmqpContext) {
$queue = $this->createAmqpQueue($queueName);
$this->context->deleteQueue($queue);
}
}
public function pushMessage(string $queueName, string $message): void
{
$queue = $this->context->createQueue($queueName);
$message = $this->context->createMessage($message);
$this->context->createProducer()->send($queue, $message);
}
public function popMessage(string $queueName, int $duration = 5): ?Message
{
if ($message = $this->getQueueConsumer($queueName)->receive($duration * 1000)) {
return new Message($message->getBody(), $message);
}
return null;
}
public function acknowledgeMessage(string $queueName, mixed $receipt): void
{
$this->getQueueConsumer($queueName)->acknowledge($receipt);
}
public function info(): array
{
return [];
}
public function countMessages(string $queueName): int
{
if ($this->context instanceof AmqpContext) {
return $this->context->declareQueue($this->createAmqpQueue($queueName));
}
return 0;
}
public function peekQueue(string $queueName, int $index = 0, int $limit = 20): array
{
return [];
}
private function getQueueConsumer(string $queueName): Consumer
{
if (\array_key_exists($queueName, $this->consumers) === false) {
$queue = $this->context->createQueue($queueName);
$this->consumers[$queueName] = $this->context->createConsumer($queue);
}
return $this->consumers[$queueName];
}
private function createAmqpQueue(string $queueName): AmqpQueue
{
/** @var AmqpContext $context */
$context = $this->context;
$queue = $context->createQueue($queueName);
$queue->addFlag(AmqpQueue::FLAG_DURABLE);
return $queue;
}
}