Skip to content

Commit

Permalink
Refactor and add tests for Connection.php (#28)
Browse files Browse the repository at this point in the history
Refactor and add tests for Connection.php
  • Loading branch information
senaranya authored Jul 4, 2020
1 parent 2d5eb3e commit fc7e6bd
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 85 deletions.
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
},
"require-dev": {
"phpunit/phpunit": "^8",
"dms/phpunit-arraysubset-asserts": "^0.1.0"
"dms/phpunit-arraysubset-asserts": "^0.1.0",
"ext-pcntl": "*"
},
"autoload": {
"psr-4": {
Expand Down
37 changes: 26 additions & 11 deletions src/HL7/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
namespace Aranyasen\HL7;

use Aranyasen\Exceptions\HL7ConnectionException;
use Aranyasen\Exceptions\HL7Exception;
use Exception;
use ReflectionException;

/**
* Usage:
Expand Down Expand Up @@ -63,7 +65,7 @@ public function __construct(string $host, int $port, int $timeout = 10)
* @param int $timeout Connection timeout
* @throws HL7ConnectionException
*/
protected function setSocket(string $host, int $port, int $timeout = 10)
protected function setSocket(string $host, int $port, int $timeout = 10): void
{
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
if (!$socket || !is_resource($socket)) {
Expand Down Expand Up @@ -112,20 +114,25 @@ protected function throwSocketError(string $message): void
/**
* Sends a Message object over this connection.
*
* @param Message $req
* @param Message $msg
* @param string $responseCharEncoding The expected character encoding of the response.
* @return Message
* @param bool $noWait Do no wait for ACK. Helpful for building load testing tools...
* @return Message|null
* @throws HL7ConnectionException
* @throws HL7Exception
* @throws ReflectionException
* @access public
* @throws \Exception
* @throws \InvalidArgumentException
*/
public function send(Message $req, string $responseCharEncoding = 'UTF-8'): Message
public function send(Message $msg, string $responseCharEncoding = 'UTF-8', bool $noWait = false): ?Message
{
$hl7Msg = $req->toString(true);

$message = $this->MESSAGE_PREFIX . $hl7Msg . $this->MESSAGE_SUFFIX;
$message = $this->MESSAGE_PREFIX . $msg->toString(true) . $this->MESSAGE_SUFFIX; // As per MLLP protocol
if (!socket_write($this->socket, $message, strlen($message))) {
throw new HL7Exception("Could not send data to server: " . socket_strerror(socket_last_error()));
}

socket_write($this->socket, $message, \strlen($message)) or die("Could not send data to server\n");
if ($noWait) {
return null;
}

$data = null;

Expand All @@ -144,7 +151,7 @@ public function send(Message $req, string $responseCharEncoding = 'UTF-8'): Mess
throw new HL7ConnectionException("No response received within {$this->timeout} seconds");
}

// Remove message prefix and suffix
// Remove message prefix and suffix added by the MLLP server
$data = preg_replace('/^' . $this->MESSAGE_PREFIX . '/', '', $data);
$data = preg_replace('/' . $this->MESSAGE_SUFFIX . '$/', '', $data);

Expand All @@ -154,6 +161,14 @@ public function send(Message $req, string $responseCharEncoding = 'UTF-8'): Mess
return new Message($data, null, true, true);
}

/*
* Return the socket opened/used by this class
*/
public function getSocket()
{
return $this->socket;
}

/**
* Close the socket
* TODO: Close only when the socket is open
Expand Down
129 changes: 56 additions & 73 deletions tests/ConnectionTest.php
Original file line number Diff line number Diff line change
@@ -1,92 +1,75 @@
<?php
declare(strict_types=1);

namespace Aranyasen\HL7\tests;
namespace Aranyasen\HL7\Tests;

use Aranyasen\Exceptions\HL7ConnectionException;
use Aranyasen\Exceptions\HL7Exception;
use Aranyasen\HL7\Message;
use Aranyasen\HL7\Connection;
use Aranyasen\HL7\Segment;
use Aranyasen\HL7\Segments\MSH;
use RuntimeException;

class ConnectionTest extends TestCase
{
public function test()
{
$this->markTestIncomplete();

$msg = new Message();
$msg->addSegment(new MSH());

$seg1 = new Segment('PID');

$seg1->setField(3, 'XXX');
use Hl7ListenerTrait;

$msg->addSegment($seg1);
protected $port = 12011;

// If you have fork support, try this...

// $pid = pcntl_fork();
//
// if (! $pid) {
// // Server process
// set_time_limit(0);
//
// // Turn on implicit output flushing so we see what we're getting
// // as it comes in.
// ob_implicit_flush();
//
// if (($sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)) < 0) {
// echo 'socket_create() failed: reason: ' . socket_strerror($sock) . "\n";
// }
//
// if (($ret = socket_bind($sock, "localhost", 12011)) < 0) {
// echo 'socket_bind() failed: reason: ' . socket_strerror($ret) . "\n";
// }
//
// if (($ret = socket_listen($sock, 5)) < 0) {
// echo 'socket_listen() failed: reason: ' . socket_strerror($ret) . "\n";
// }
//
// do {
// if (($msgsock = socket_accept($sock)) < 0) {
// echo 'socket_accept() failed: reason: ' . socket_strerror($msgsock) . "\n";
// break;
// }
//
// if (false === ($buf = socket_read($msgsock, 8192, PHP_NORMAL_READ))) {
// echo 'socket_read() failed: reason: ' . socket_strerror($ret) . "\n";
// break 2;
// }
//
// echo "Incoming: $buf\n";
//
// $msg = new Message($buf);
//
// $ack = new ACK($msg);
// socket_write($msgsock, "\013" . $ack->toString() . "\034\015", \strlen($ack->toString() + 3));
// echo 'Said: ' . $ack->toString(1) . "\n";
//
// } while (true);
//
// socket_close($msgsock);
//
// exit;
// }
/**
* @test
* @throws HL7ConnectionException
* @throws HL7Exception
* @throws \ReflectionException
*/
public function a_message_can_be_sent_to_a_hl7_server(): void
{
$pid = pcntl_fork();
if ($pid === -1) {
throw new RuntimeException('Could not fork');
}
if (!$pid) { // In child process
$this->createTcpServer($this->port, 1);
}
if ($pid) { // in Parent process...
sleep(2); // Give a second to server (child) to start up. TODO: Speed up by polling

$socket = $this->getMock('Net_Socket');
$connection = new Connection('localhost', $this->port);
$msg = new Message("MSH|^~\\&|1|\rPV1|1|O|^AAAA1^^^BB|", null, true, true);
$ack = $connection->send($msg);
$this->assertInstanceOf(Message::class, $ack);
$this->assertSame('MSH|^~\&|1||||||ACK|\nMSA|AA|\n|\n', $ack->toString());

$socket->expects($this->once())
->method('write')
->with("\013" . $msg->toString() . "\034\015");
$this->assertStringContainsString("MSH|^~\\&|1|\nPV1|1|O|^AAAA1^^^BB|", $this->getWhatServerGot());

$socket->expects($this->once())
->method('read')
->will($this->returnValue("MSH*^~\\&*1\rPID***xxx\r" . "\034\015"));
$this->closeTcpSocket($connection->getSocket()); // Clean up listener
pcntl_wait($status); // Wait till child is closed
}
}

$conn = new Connection($socket);
/**
* @test
* @throws HL7ConnectionException
* @throws HL7Exception
* @throws \ReflectionException
*/
public function do_not_wait_for_ack_after_sending_if_corresponding_parameter_is_set(): void
{
$pid = pcntl_fork();
if ($pid === -1) {
throw new RuntimeException('Could not fork');
}
if (!$pid) { // In child process
$this->createTcpServer($this->port, 1);
}
if ($pid) { // in Parent process...
sleep(2); // Give a second to server (child) to start up

$resp = $conn->send($msg);
$connection = new Connection('localhost', $this->port);
$msg = new Message("MSH|^~\\&|1|\rPV1|1|O|^AAAA1^^^BB|", null, true, true);
$this->assertNull($connection->send($msg,' UTF-8', true));

$this->assertInstanceOf(Message::class, $resp);
$this->closeTcpSocket($connection->getSocket()); // Clean up listener
pcntl_wait($status); // Wait till child is closed
}
}
}
139 changes: 139 additions & 0 deletions tests/Hl7ListenerTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
<?php

namespace Aranyasen\HL7\Tests;

use Aranyasen\Exceptions\HL7Exception;
use Aranyasen\HL7\Message;
use Aranyasen\HL7\Messages\ACK;

/**
* Trait Hl7ListenerTrait
*
* Create a TCP socket server to receive HL7 messages. It responds to HL7 messages with an ACK
* It also creates a pipe so client can get back exactly what it sent. Useful for testing...
* To close the server, send "\n" or "shutdown\n"
* @package Aranyasen\HL7\Tests
*/
trait Hl7ListenerTrait
{
private $pipeName = "pipe1";

// As per MLLP protocol, the sender prefixes and suffixes the HL7 message with certain codes. If these need to be
// overwritten, simply declare these after the 'use Hl7ListenerTrait' statement in the calling class
protected $MESSAGE_PREFIX = "\013";
protected $MESSAGE_SUFFIX = "\034\015";

public function writeToPipe(string $value): void
{
$pipe = fopen($this->pipeName,'wb');
fwrite($pipe, $value);
}

public function readFromPipe(): string
{
$pipe = fopen($this->pipeName,'rb');
return fread($pipe, 1024);
}

public function getWhatServerGot(): string
{
return $this->readFromPipe();
}

/**
* @param int $port
* @param int $totalClientsToConnect How many clients are expected to connect to this server, once it's up
* @throws HL7Exception
* @throws \ReflectionException
*/
public function createTcpServer(int $port, int $totalClientsToConnect): void
{
if (($socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)) < 0) {
throw new \RuntimeException('socket_create() failed: reason: ' . socket_strerror(socket_last_error()) . "\n");
}

// This is to avoid "address already in use" error while doing ->bind()
if (!socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1)) {
echo socket_strerror(socket_last_error($socket));
exit(-1);
}

if (($ret = socket_bind($socket, "localhost", $port)) < 0) {
throw new \RuntimeException('socket_bind() failed: reason: ' . socket_strerror($ret) . "\n");
}
if (($ret = socket_listen($socket, 5)) < 0) {
throw new \RuntimeException('socket_listen() failed: reason: ' . socket_strerror($ret) . "\n");
}

$clientCount = 0;
while (true) { // Loop over each client
if (($clientSocket = socket_accept($socket)) < 0) {
echo 'socket_accept() failed: reason: ' . socket_strerror(socket_last_error()) . "\n";
socket_close($clientSocket);
exit();
}
if ($clientSocket === false) {
continue;
}

$clientCount++;
$clientName = 'Unknown';
socket_getpeername($clientSocket, $clientName);
// echo "Client {$clientCount} ({$clientName}) connected\n"; // Uncomment to debug

while (true) { // Keep reading a given client until they send "shutdown" or an empty string
$buffer = socket_read($clientSocket, 1024); // Keeps reading until bytes exhaust, /n or /r
if (false === $buffer) {
break;
}
// echo "\n--- From client: '$buffer' ---\n\n"; // Uncomment to debug
if (!$buffer || empty(trim($buffer)) || false !== stripos($buffer, 'shutdown')) {
break;
}

$ackString = $this->getAckString($buffer);
$message = $this->MESSAGE_PREFIX . $ackString . $this->MESSAGE_SUFFIX;
socket_write($clientSocket, $message, strlen($message));

// Also write to a pipe/msg queue for client to get the actual message
$this->writeToPipe($buffer);
}

socket_shutdown($clientSocket);
socket_close($clientSocket);

if ($totalClientsToConnect > 0 && $clientCount >= $totalClientsToConnect) {
break;
}
}
socket_close($socket);
exit(0); // Child process needs it
}

/**
* @param $socket
*/
public function closeTcpSocket($socket): void
{
$msg = "\n"; // Or send "shutdown\n"
socket_write($socket, $msg, strlen($msg)); // Tell the client to shutdown
}

/**
* @param string $hl7
* @return string ACK string
* @throws HL7Exception
* @throws \ReflectionException
*/
private function getAckString(string $hl7): string
{
// Remove message prefix and suffix
$hl7 = preg_replace('/^' . $this->MESSAGE_PREFIX . '/', '', $hl7);
$hl7 = preg_replace('/' . $this->MESSAGE_SUFFIX . '$/', '', $hl7);

$msg = new Message(trim($hl7), null, true, true);
$ack = new ACK($msg);
return $ack->toString();
}
// TODO: This trait leaves a file pipe1 behind. Clean it up: in tearDown: unlink($this->>pipe)
}

0 comments on commit fc7e6bd

Please sign in to comment.