Skip to content

Commit

Permalink
Merge pull request #128 from mermshaus/issue-125
Browse files Browse the repository at this point in the history
Add test to illustrate #125 (stream_select/interrupt issue). Also a fix
  • Loading branch information
WyriHaximus authored Nov 30, 2022
2 parents aba52e4 + 1f4bcc9 commit 5b74a0f
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 70 deletions.
5 changes: 4 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,21 @@
],
"require": {
"php": "^7.1 || ^8.0",
"ext-pcntl": "*",
"react/event-loop": "^1.0 || ^0.5 || ^0.4",
"react/promise": "~2.2"
},
"require-dev": {
"phpunit/phpunit": "^9.5 || ^7.5.20"
"phpunit/phpunit": "^9.5 || ^7.5.20",
"symfony/process": "^6.1 || ^4.4"
},
"autoload": {
"psr-4": {
"Bunny\\": "src/Bunny/"
}
},
"autoload-dev": {
"files": ["test/Library/functions.php"],
"psr-4": {
"Bunny\\": "test/Bunny/",
"Bunny\\Test\\Library\\": "test/Library/"
Expand Down
3 changes: 3 additions & 0 deletions docker/bunny/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ RUN apt-get update \
vim \
zip

RUN docker-php-ext-configure pcntl --enable-pcntl \
&& docker-php-ext-install pcntl

RUN pecl install xdebug \
&& docker-php-ext-enable xdebug
RUN echo "xdebug.mode=coverage" >> /usr/local/etc/php/conf.d/docker-php-ext-xdebug.ini
Expand Down
5 changes: 4 additions & 1 deletion src/Bunny/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,11 @@ public function run($maxSeconds = null)

if (($n = @stream_select($r, $w, $e, $tvSec, $tvUsec)) === false) {
$lastError = error_get_last();

// Note: The word "Unable" within the stream_select error message was spelled "unable" in PHP
// versions < 8.
if ($lastError !== null &&
preg_match("/^stream_select\\(\\): unable to select \\[(\\d+)\\]:/", $lastError["message"], $m) &&
preg_match("/^stream_select\\(\\): [Uu]nable to select \\[(\\d+)\\]:/", $lastError["message"], $m) &&
intval($m[1]) === PCNTL_EINTR
) {
// got interrupted by signal, dispatch signals & continue
Expand Down
35 changes: 35 additions & 0 deletions test/Bunny/ClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,14 @@
use Bunny\Exception\ClientException;
use Bunny\Protocol\MethodBasicAckFrame;
use Bunny\Protocol\MethodBasicReturnFrame;
use Bunny\Test\Library\Environment;
use Bunny\Test\Library\Paths;
use Bunny\Test\Library\SynchronousClientHelper;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Process\Exception\ProcessFailedException;
use Symfony\Component\Process\Process;

use const SIGINT;

class ClientTest extends TestCase
{
Expand Down Expand Up @@ -127,6 +133,35 @@ public function testDisconnectWithBufferedMessages()
$channel->queueDelete("disconnect_test");
}

/**
* Spawns an external consumer process, and tries to stop it with SIGINT.
*/
public function testStopConsumerWithSigInt()
{
$queueName = 'stop-consumer-with-sigint';

$path = Paths::getTestsRootPath() . '/scripts/bunny-consumer.php';

$process = new Process([$path, Environment::getTestRabbitMqConnectionUri(), $queueName, '0']);

$process->start();

$signalSent = false;
$starttime = microtime(true);

// Send SIGINT after 1.0 seconds
while ($process->isRunning()) {
if (!$signalSent && microtime(true) > $starttime + 1.0) {
$process->signal(SIGINT);
$signalSent = true;
}

usleep(10000);
}

self::assertTrue($process->isSuccessful(), $process->getOutput() . "\n" . $process->getErrorOutput());
}

public function testGet()
{
$client = $this->helper->createClient();
Expand Down
66 changes: 0 additions & 66 deletions test/Library/AbstractClientHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,70 +6,4 @@

abstract class AbstractClientHelper
{
/**
* @param string $uri
*
* @return array
*/
protected function parseAmqpUri($uri): array
{
$uriComponents = parse_url($uri);

if (
!isset($uriComponents['scheme'])
|| !in_array($uriComponents['scheme'], ['amqp', 'amqps'])
) {
throw new \RuntimeException(
sprintf(
'URI scheme must be "amqp" or "amqps". URI given: "%s"',
$uri
)
);
}

$options = [];

if (isset($uriComponents['host'])) {
$options['host'] = $uriComponents['host'];
}

if (isset($uriComponents['port'])) {
$options['port'] = $uriComponents['port'];
}

if (isset($uriComponents['user'])) {
$options['user'] = $uriComponents['user'];
}

if (isset($uriComponents['pass'])) {
$options['password'] = $uriComponents['pass'];
}

if (isset($uriComponents['path'])) {
$vhostCandidate = $uriComponents['path'];

if (strpos($vhostCandidate, '/') === 0) {
$vhostCandidate = substr($vhostCandidate, 1);
}

if (strpos($vhostCandidate, '/') !== false) {
throw new \RuntimeException(
sprintf(
'An URI path component that is a valid vhost may not contain unescaped "/" characters. URI given: "%s"',
$uri
)
);
}

$vhostCandidate = rawurldecode($vhostCandidate);

$options['vhost'] = $vhostCandidate;
}

if ($options['vhost'] === '') {
$options['vhost'] = '/';
}

return $options;
}
}
2 changes: 1 addition & 1 deletion test/Library/AsynchronousClientHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public function getDefaultOptions(): array
{
$options = [];

$options = array_merge($options, $this->parseAmqpUri(Environment::getTestRabbitMqConnectionUri()));
$options = array_merge($options, parseAmqpUri(Environment::getTestRabbitMqConnectionUri()));

return $options;
}
Expand Down
13 changes: 13 additions & 0 deletions test/Library/Paths.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

declare(strict_types=1);

namespace Bunny\Test\Library;

final class Paths
{
public static function getTestsRootPath(): string
{
return dirname(__DIR__);
}
}
2 changes: 1 addition & 1 deletion test/Library/SynchronousClientHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public function getDefaultOptions(): array
{
$options = [];

$options = array_merge($options, $this->parseAmqpUri(Environment::getTestRabbitMqConnectionUri()));
$options = array_merge($options, parseAmqpUri(Environment::getTestRabbitMqConnectionUri()));

return $options;
}
Expand Down
67 changes: 67 additions & 0 deletions test/Library/functions.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
<?php

declare(strict_types=1);

namespace Bunny\Test\Library;

function parseAmqpUri($uri): array
{
$uriComponents = parse_url($uri);

if (
!isset($uriComponents['scheme'])
|| !in_array($uriComponents['scheme'], ['amqp', 'amqps'])
) {
throw new \RuntimeException(
sprintf(
'URI scheme must be "amqp" or "amqps". URI given: "%s"',
$uri
)
);
}

$options = [];

if (isset($uriComponents['host'])) {
$options['host'] = $uriComponents['host'];
}

if (isset($uriComponents['port'])) {
$options['port'] = $uriComponents['port'];
}

if (isset($uriComponents['user'])) {
$options['user'] = $uriComponents['user'];
}

if (isset($uriComponents['pass'])) {
$options['password'] = $uriComponents['pass'];
}

if (isset($uriComponents['path'])) {
$vhostCandidate = $uriComponents['path'];

if (strpos($vhostCandidate, '/') === 0) {
$vhostCandidate = substr($vhostCandidate, 1);
}

if (strpos($vhostCandidate, '/') !== false) {
throw new \RuntimeException(
sprintf(
'An URI path component that is a valid vhost may not contain unescaped "/" characters. URI given: "%s"',
$uri
)
);
}

$vhostCandidate = rawurldecode($vhostCandidate);

$options['vhost'] = $vhostCandidate;
}

if ($options['vhost'] === '') {
$options['vhost'] = '/';
}

return $options;
}
51 changes: 51 additions & 0 deletions test/scripts/bunny-consumer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/usr/bin/env php
<?php

// Usage: bunny-consumer.php <amqp-uri> <queue-name> <max-seconds>

declare(strict_types=1);

namespace Bunny\Test\App;

use Bunny\Channel;
use Bunny\Client;
use Bunny\Message;

use function Bunny\Test\Library\parseAmqpUri;

require __DIR__ . '/../../vendor/autoload.php';

function app(array $args)
{
$connection = parseAmqpUri($args['amqpUri']);

$client = new Client($connection);

pcntl_signal(SIGINT, function () use ($client) {
$client->disconnect()->done(function () use ($client) {
$client->stop();
});
});

$client->connect();
$channel = $client->channel();

$channel->qos(0, 1);
$channel->queueDeclare($args['queueName']);
$channel->consume(function (Message $message, Channel $channel) use ($client) {
$channel->ack($message);
});
$client->run($args['maxSeconds'] > 0 ? $args['maxSeconds'] : null);
}

$argv_copy = $argv;

array_shift($argv_copy);

$args = [
'amqpUri' => array_shift($argv_copy),
'queueName' => array_shift($argv_copy),
'maxSeconds' => (int) array_shift($argv_copy),
];

app($args);

0 comments on commit 5b74a0f

Please sign in to comment.