diff --git a/composer.json b/composer.json index 566a918..379d9eb 100644 --- a/composer.json +++ b/composer.json @@ -52,7 +52,7 @@ "sciactive/requirephp": "~1.3", "cboden/ratchet": "0.3.*", "zendframework/zend-log": "2.*", - "devristo/phpws": "dev-master" + "textalk/websocket": "1.0.*" }, "minimum-stability": "dev", "maybe-require-idk": { diff --git a/composer.lock b/composer.lock index b8bddc7..758019a 100644 --- a/composer.lock +++ b/composer.lock @@ -4,8 +4,8 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file", "This file is @generated automatically" ], - "hash": "eea8f094ce2562fa5c50c2608ddcf4e7", - "content-hash": "7543a3766d55263b507e0d4caf1621ac", + "hash": "eca821ae0b0a48cbcf859a26fa75448f", + "content-hash": "98c70ed3392f725755310156f3c20756", "packages": [ { "name": "cboden/ratchet", @@ -86,46 +86,6 @@ "homepage": "https://github.com/container-interop/container-interop", "time": "2017-02-14 19:40:03" }, - { - "name": "devristo/phpws", - "version": "dev-master", - "source": { - "type": "git", - "url": "https://github.com/Devristo/phpws.git", - "reference": "4911fe9b2b7dd57bcc052f7c2539fff6ec71043a" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/Devristo/phpws/zipball/4911fe9b2b7dd57bcc052f7c2539fff6ec71043a", - "reference": "4911fe9b2b7dd57bcc052f7c2539fff6ec71043a", - "shasum": "" - }, - "require": { - "react/socket": "0.4.*", - "react/socket-client": "0.4.*", - "react/stream": "0.4.*", - "zendframework/zend-http": "2.*", - "zendframework/zend-log": "2.*" - }, - "type": "library", - "autoload": { - "psr-0": { - "Devristo\\Phpws\\": "src/" - } - }, - "authors": [ - { - "name": "Devristo", - "email": "chris@devristo.com" - } - ], - "description": "WebSocket Server and Client library for PHP", - "support": { - "source": "https://github.com/Devristo/phpws/tree/master", - "issues": "https://github.com/Devristo/phpws/issues" - }, - "time": "2016-12-21 07:28:56" - }, { "name": "evenement/evenement", "version": "dev-master", @@ -470,77 +430,6 @@ ], "time": "2016-10-10 12:19:37" }, - { - "name": "react/cache", - "version": "v0.4.1", - "source": { - "type": "git", - "url": "https://github.com/reactphp/cache.git", - "reference": "558f614891341b1d817a8cdf9a358948ec49638f" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/reactphp/cache/zipball/558f614891341b1d817a8cdf9a358948ec49638f", - "reference": "558f614891341b1d817a8cdf9a358948ec49638f", - "shasum": "" - }, - "require": { - "php": ">=5.3.0", - "react/promise": "~2.0|~1.1" - }, - "type": "library", - "autoload": { - "psr-4": { - "React\\Cache\\": "src\\" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "description": "Async caching.", - "keywords": [ - "cache" - ], - "time": "2016-02-25 18:17:16" - }, - { - "name": "react/dns", - "version": "v0.4.4", - "source": { - "type": "git", - "url": "https://github.com/reactphp/dns.git", - "reference": "e3064994379cb4d1e0ee3f5b850b6e8748efae3b" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/reactphp/dns/zipball/e3064994379cb4d1e0ee3f5b850b6e8748efae3b", - "reference": "e3064994379cb4d1e0ee3f5b850b6e8748efae3b", - "shasum": "" - }, - "require": { - "php": ">=5.3.0", - "react/cache": "~0.4.0|~0.3.0", - "react/promise": "~2.1|~1.2", - "react/socket": "^0.5 || ^0.4.4" - }, - "type": "library", - "autoload": { - "psr-4": { - "React\\Dns\\": "src" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "description": "Async DNS resolver.", - "keywords": [ - "dns", - "dns-resolver" - ], - "time": "2017-02-13 09:58:13" - }, { "name": "react/event-loop", "version": "0.4.x-dev", @@ -668,48 +557,6 @@ ], "time": "2017-01-26 09:23:38" }, - { - "name": "react/socket-client", - "version": "0.4.x-dev", - "source": { - "type": "git", - "url": "https://github.com/reactphp/socket-client.git", - "reference": "49e730523b73d912e56f7a41f53ed3fc083ae167" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/reactphp/socket-client/zipball/49e730523b73d912e56f7a41f53ed3fc083ae167", - "reference": "49e730523b73d912e56f7a41f53ed3fc083ae167", - "shasum": "" - }, - "require": { - "php": ">=5.4.0", - "react/dns": "0.4.*", - "react/event-loop": "0.4.*", - "react/promise": "~2.0", - "react/stream": "0.4.*" - }, - "type": "library", - "extra": { - "branch-alias": { - "dev-master": "0.4-dev" - } - }, - "autoload": { - "psr-4": { - "React\\SocketClient\\": "src" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "description": "Async connector to open TCP/IP and SSL/TLS based connections.", - "keywords": [ - "Socket" - ], - "time": "2016-12-06 10:54:49" - }, { "name": "react/stream", "version": "v0.4.6", @@ -1150,142 +997,42 @@ "time": "2017-02-12 19:14:59" }, { - "name": "zendframework/zend-escaper", - "version": "dev-develop", + "name": "textalk/websocket", + "version": "1.0.3", "source": { "type": "git", - "url": "https://github.com/zendframework/zend-escaper.git", - "reference": "724402ba0dc7d82eec30dac97efa669e69e0ae7c" + "url": "https://github.com/Textalk/websocket-php.git", + "reference": "ba2e5f9ef7cf24d536d1c864ae74b2a9599b86eb" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/zendframework/zend-escaper/zipball/724402ba0dc7d82eec30dac97efa669e69e0ae7c", - "reference": "724402ba0dc7d82eec30dac97efa669e69e0ae7c", + "url": "https://api.github.com/repos/Textalk/websocket-php/zipball/ba2e5f9ef7cf24d536d1c864ae74b2a9599b86eb", + "reference": "ba2e5f9ef7cf24d536d1c864ae74b2a9599b86eb", "shasum": "" }, - "require": { - "php": ">=5.5" - }, "require-dev": { - "fabpot/php-cs-fixer": "1.7.*", - "phpunit/phpunit": "~4.0" + "phpunit/phpunit": "4.1.*", + "phpunit/phpunit-selenium": "1.3.3", + "satooshi/php-coveralls": "dev-master" }, "type": "library", - "extra": { - "branch-alias": { - "dev-master": "2.5-dev", - "dev-develop": "2.6-dev" - } - }, "autoload": { "psr-4": { - "Zend\\Escaper\\": "src/" + "WebSocket\\": "lib" } }, "notification-url": "https://packagist.org/downloads/", "license": [ - "BSD-3-Clause" - ], - "homepage": "https://github.com/zendframework/zend-escaper", - "keywords": [ - "escaper", - "zf2" - ], - "time": "2016-06-30 19:50:10" - }, - { - "name": "zendframework/zend-http", - "version": "dev-develop", - "source": { - "type": "git", - "url": "https://github.com/zendframework/zend-http.git", - "reference": "7bf4d2417c87de02d57d039bf2f9eaefa3eb7802" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/zendframework/zend-http/zipball/7bf4d2417c87de02d57d039bf2f9eaefa3eb7802", - "reference": "7bf4d2417c87de02d57d039bf2f9eaefa3eb7802", - "shasum": "" - }, - "require": { - "php": "^5.5 || ^7.0", - "zendframework/zend-loader": "^2.5", - "zendframework/zend-stdlib": "^2.5 || ^3.0", - "zendframework/zend-uri": "^2.5", - "zendframework/zend-validator": "^2.5" - }, - "require-dev": { - "phpunit/phpunit": "^4.0", - "zendframework/zend-coding-standard": "~1.0.0", - "zendframework/zend-config": "^2.5" - }, - "type": "library", - "extra": { - "branch-alias": { - "dev-master": "2.6-dev", - "dev-develop": "2.7-dev" - } - }, - "autoload": { - "psr-4": { - "Zend\\Http\\": "src/" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "BSD-3-Clause" - ], - "description": "provides an easy interface for performing Hyper-Text Transfer Protocol (HTTP) requests", - "homepage": "https://github.com/zendframework/zend-http", - "keywords": [ - "http", - "zf2" + "MIT" ], - "time": "2017-01-31 14:45:14" - }, - { - "name": "zendframework/zend-loader", - "version": "dev-develop", - "source": { - "type": "git", - "url": "https://github.com/zendframework/zend-loader.git", - "reference": "dcacfc3bd1cb0721409d62dc309f5d29eb1f4631" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/zendframework/zend-loader/zipball/dcacfc3bd1cb0721409d62dc309f5d29eb1f4631", - "reference": "dcacfc3bd1cb0721409d62dc309f5d29eb1f4631", - "shasum": "" - }, - "require": { - "php": "^5.5 || ^7.0" - }, - "require-dev": { - "fabpot/php-cs-fixer": "1.7.*", - "phpunit/phpunit": "^4.8" - }, - "type": "library", - "extra": { - "branch-alias": { - "dev-master": "2.5-dev", - "dev-develop": "2.6-dev" - } - }, - "autoload": { - "psr-4": { - "Zend\\Loader\\": "src/" + "authors": [ + { + "name": "Fredrik Liljegren", + "email": "fredrik.liljegren@textalk.se" } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "BSD-3-Clause" ], - "homepage": "https://github.com/zendframework/zend-loader", - "keywords": [ - "loader", - "zf2" - ], - "time": "2016-05-05 14:59:09" + "description": "WebSocket client and server", + "time": "2015-04-11 05:45:54" }, { "name": "zendframework/zend-log", @@ -1463,132 +1210,13 @@ "zf2" ], "time": "2016-09-13 14:40:02" - }, - { - "name": "zendframework/zend-uri", - "version": "dev-develop", - "source": { - "type": "git", - "url": "https://github.com/zendframework/zend-uri.git", - "reference": "ed906be3b402020fca728bd0f76519ef6dd02ab1" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/zendframework/zend-uri/zipball/ed906be3b402020fca728bd0f76519ef6dd02ab1", - "reference": "ed906be3b402020fca728bd0f76519ef6dd02ab1", - "shasum": "" - }, - "require": { - "php": "^5.5 || ^7.0", - "zendframework/zend-escaper": "^2.5", - "zendframework/zend-validator": "^2.5" - }, - "require-dev": { - "fabpot/php-cs-fixer": "1.7.*", - "phpunit/phpunit": "~4.0" - }, - "type": "library", - "extra": { - "branch-alias": { - "dev-master": "2.5-dev", - "dev-develop": "2.6-dev" - } - }, - "autoload": { - "psr-4": { - "Zend\\Uri\\": "src/" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "BSD-3-Clause" - ], - "description": "a component that aids in manipulating and validating ยป Uniform Resource Identifiers (URIs)", - "homepage": "https://github.com/zendframework/zend-uri", - "keywords": [ - "uri", - "zf2" - ], - "time": "2016-05-11 18:49:50" - }, - { - "name": "zendframework/zend-validator", - "version": "dev-develop", - "source": { - "type": "git", - "url": "https://github.com/zendframework/zend-validator.git", - "reference": "8c6b4f2d37c7ef21a979638cb48e03c043062c8a" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/zendframework/zend-validator/zipball/8c6b4f2d37c7ef21a979638cb48e03c043062c8a", - "reference": "8c6b4f2d37c7ef21a979638cb48e03c043062c8a", - "shasum": "" - }, - "require": { - "container-interop/container-interop": "^1.1", - "php": "^5.5 || ^7.0", - "zendframework/zend-stdlib": "^2.7 || ^3.0" - }, - "require-dev": { - "fabpot/php-cs-fixer": "1.7.*", - "phpunit/phpunit": "^4.0", - "zendframework/zend-cache": "^2.6.1", - "zendframework/zend-config": "^2.6", - "zendframework/zend-db": "^2.7", - "zendframework/zend-filter": "^2.6", - "zendframework/zend-http": "^2.5.4", - "zendframework/zend-i18n": "^2.6", - "zendframework/zend-math": "^2.6", - "zendframework/zend-servicemanager": "^2.7.5 || ^3.0.3", - "zendframework/zend-session": "^2.6.2", - "zendframework/zend-uri": "^2.5" - }, - "suggest": { - "zendframework/zend-db": "Zend\\Db component", - "zendframework/zend-filter": "Zend\\Filter component, required by the Digits validator", - "zendframework/zend-i18n": "Zend\\I18n component to allow translation of validation error messages as well as to use the various Date validators", - "zendframework/zend-i18n-resources": "Translations of validator messages", - "zendframework/zend-math": "Zend\\Math component", - "zendframework/zend-servicemanager": "Zend\\ServiceManager component to allow using the ValidatorPluginManager and validator chains", - "zendframework/zend-session": "Zend\\Session component", - "zendframework/zend-uri": "Zend\\Uri component, required by the Uri and Sitemap\\Loc validators" - }, - "type": "library", - "extra": { - "branch-alias": { - "dev-master": "2.8-dev", - "dev-develop": "2.9-dev" - }, - "zf": { - "component": "Zend\\Validator", - "config-provider": "Zend\\Validator\\ConfigProvider" - } - }, - "autoload": { - "psr-4": { - "Zend\\Validator\\": "src/" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "BSD-3-Clause" - ], - "description": "provides a set of commonly needed validators", - "homepage": "https://github.com/zendframework/zend-validator", - "keywords": [ - "validator", - "zf2" - ], - "time": "2017-01-29 17:28:12" } ], "packages-dev": [], "aliases": [], "minimum-stability": "dev", "stability-flags": { - "sciactive/nymph-server": 20, - "devristo/phpws": 20 + "sciactive/nymph-server": 20 }, "prefer-stable": false, "prefer-lowest": false, diff --git a/src/HookMethods.php b/src/HookMethods.php index 648e7df..7d3844c 100644 --- a/src/HookMethods.php +++ b/src/HookMethods.php @@ -1,6 +1,7 @@ -saveEntity', -10, function(&$arguments, $name, &$object, &$function, &$data){ - $data['entity'] = $arguments[0]; - $data['guid'] = $arguments[0]->guid; - }); - Hook::addCallback('Nymph->saveEntity', 10, function(&$return, $name, &$object, &$function, &$data){ - if (!$return[0]) { - return; - } - HookMethods::sendMessage(json_encode(['action' => 'publish', 'event' => $data['guid'] === $data['entity']->guid ? 'update' : 'create', 'guid' => $data['entity']->guid, 'entity' => $data['entity']->jsonSerialize(false)])); - }); - Hook::addCallback('Nymph->deleteEntity', -10, function(&$arguments, $name, &$object, &$function, &$data){ - $data['guid'] = $arguments[0]->guid; - }); - Hook::addCallback('Nymph->deleteEntity', 10, function(&$return, $name, &$object, &$function, &$data){ - if (!$return[0]) { - return; - } - HookMethods::sendMessage(json_encode(['action' => 'publish', 'event' => 'delete', 'guid' => $data['guid']])); - }); - Hook::addCallback('Nymph->deleteEntityByID', -10, function(&$arguments, $name, &$object, &$function, &$data){ - $data['guid'] = $arguments[0]; - }); - Hook::addCallback('Nymph->deleteEntityByID', 10, function(&$return, $name, &$object, &$function, &$data){ - if (!$return[0]) { - return; - } - HookMethods::sendMessage(json_encode(['action' => 'publish', 'event' => 'delete', 'guid' => $data['guid']])); - }); - Hook::addCallback('Nymph->newUID', -10, function(&$arguments, $name, &$object, &$function, &$data){ - $data['name'] = $arguments[0]; - }); - Hook::addCallback('Nymph->newUID', 10, function(&$return, $name, &$object, &$function, &$data){ - if (!isset($return[0])) { - return; - } - HookMethods::sendMessage(json_encode(['action' => 'publish', 'event' => 'newUID', 'name' => $data['name']])); - }); - Hook::addCallback('Nymph->setUID', -10, function(&$arguments, $name, &$object, &$function, &$data){ - $data['name'] = $arguments[0]; - }); - Hook::addCallback('Nymph->setUID', 10, function(&$return, $name, &$object, &$function, &$data){ - if (!$return[0]) { - return; - } - HookMethods::sendMessage(json_encode(['action' => 'publish', 'event' => 'setUID', 'name' => $data['name']])); - }); - Hook::addCallback('Nymph->renameUID', -10, function(&$arguments, $name, &$object, &$function, &$data){ - $data['oldName'] = $arguments[0]; - $data['newName'] = $arguments[1]; - }); - Hook::addCallback('Nymph->renameUID', 10, function(&$return, $name, &$object, &$function, &$data){ - if (!$return[0]) { - return; - } - HookMethods::sendMessage(json_encode(['action' => 'publish', 'event' => 'renameUID', 'oldName' => $data['oldName'], 'newName' => $data['newName']])); - }); - Hook::addCallback('Nymph->deleteUID', -10, function(&$arguments, $name, &$object, &$function, &$data){ - $data['name'] = $arguments[0]; - }); - Hook::addCallback('Nymph->deleteUID', 10, function(&$return, $name, &$object, &$function, &$data){ - if (!$return[0]) { - return; - } - HookMethods::sendMessage(json_encode(['action' => 'publish', 'event' => 'deleteUID', 'name' => $data['name']])); - }); + Hook::addCallback( + 'Nymph->saveEntity', + -10, + function (&$arguments, $name, &$object, &$function, &$data) { + $data['entity'] = $arguments[0]; + $data['guid'] = $arguments[0]->guid; + } + ); + Hook::addCallback( + 'Nymph->saveEntity', + 10, + function (&$return, $name, &$object, &$function, &$data) { + if (!$return[0]) { + return; + } + HookMethods::sendMessage( + json_encode( + [ + 'action' => 'publish', + 'event' => $data['guid'] === $data['entity']->guid + ? 'update' + : 'create', + 'guid' => $data['entity']->guid, + 'entity' => $data['entity']->jsonSerialize(false) + ] + ) + ); + } + ); + Hook::addCallback( + 'Nymph->deleteEntity', + -10, + function (&$arguments, $name, &$object, &$function, &$data) { + $data['guid'] = $arguments[0]->guid; + } + ); + Hook::addCallback( + 'Nymph->deleteEntity', + 10, + function (&$return, $name, &$object, &$function, &$data) { + if (!$return[0]) { + return; + } + HookMethods::sendMessage( + json_encode( + [ + 'action' => 'publish', + 'event' => 'delete', + 'guid' => $data['guid'] + ] + ) + ); + } + ); + Hook::addCallback( + 'Nymph->deleteEntityByID', + -10, + function (&$arguments, $name, &$object, &$function, &$data) { + $data['guid'] = $arguments[0]; + } + ); + Hook::addCallback( + 'Nymph->deleteEntityByID', + 10, + function (&$return, $name, &$object, &$function, &$data) { + if (!$return[0]) { + return; + } + HookMethods::sendMessage( + json_encode( + [ + 'action' => 'publish', + 'event' => 'delete', + 'guid' => $data['guid'] + ] + ) + ); + } + ); + Hook::addCallback( + 'Nymph->newUID', + -10, + function (&$arguments, $name, &$object, &$function, &$data) { + $data['name'] = $arguments[0]; + } + ); + Hook::addCallback( + 'Nymph->newUID', + 10, + function (&$return, $name, &$object, &$function, &$data) { + if (!isset($return[0])) { + return; + } + HookMethods::sendMessage( + json_encode( + [ + 'action' => 'publish', + 'event' => 'newUID', + 'name' => $data['name'] + ] + ) + ); + } + ); + Hook::addCallback( + 'Nymph->setUID', + -10, + function (&$arguments, $name, &$object, &$function, &$data) { + $data['name'] = $arguments[0]; + } + ); + Hook::addCallback( + 'Nymph->setUID', + 10, + function (&$return, $name, &$object, &$function, &$data) { + if (!$return[0]) { + return; + } + HookMethods::sendMessage( + json_encode( + [ + 'action' => 'publish', + 'event' => 'setUID', + 'name' => $data['name'] + ] + ) + ); + } + ); + Hook::addCallback( + 'Nymph->renameUID', + -10, + function (&$arguments, $name, &$object, &$function, &$data) { + $data['oldName'] = $arguments[0]; + $data['newName'] = $arguments[1]; + } + ); + Hook::addCallback( + 'Nymph->renameUID', + 10, + function (&$return, $name, &$object, &$function, &$data) { + if (!$return[0]) { + return; + } + HookMethods::sendMessage( + json_encode( + [ + 'action' => 'publish', + 'event' => 'renameUID', + 'oldName' => $data['oldName'], + 'newName' => $data['newName'] + ] + ) + ); + } + ); + Hook::addCallback( + 'Nymph->deleteUID', + -10, + function (&$arguments, $name, &$object, &$function, &$data) { + $data['name'] = $arguments[0]; + } + ); + Hook::addCallback( + 'Nymph->deleteUID', + 10, + function (&$return, $name, &$object, &$function, &$data) { + if (!$return[0]) { + return; + } + HookMethods::sendMessage( + json_encode( + [ + 'action' => 'publish', + 'event' => 'deleteUID', + 'name' => $data['name'] + ] + ) + ); + } + ); } public static function sendMessage($message) { - try { - $config = \SciActive\RequirePHP::_('NymphPubSubConfig'); - - $loop = \React\EventLoop\Factory::create(); - - $logger = new \Zend\Log\Logger(); - $writer = new \Zend\Log\Writer\Stream("php://stderr"); - $logger->addWriter($writer); - - foreach ($config['entries'] as $host) { - $client = new \Devristo\Phpws\Client\WebSocket($host, $loop, $logger); - - $client->on("connect", function() use ($message, $client){ - $client->send($message); - $client->close(); - }); - - $client->open(); - } + $config = \SciActive\RequirePHP::_('NymphPubSubConfig'); - $loop->run(); - } catch (\React\SocketClient\ConnectionException $e) { - // Ignore a failed connection. + foreach ($config['entries'] as $host) { + $client = new TextalkWebSocketClient($host); + $client->send($message); } } -} \ No newline at end of file +} diff --git a/src/MessageHandler.php b/src/MessageHandler.php index 9095296..821bfb7 100644 --- a/src/MessageHandler.php +++ b/src/MessageHandler.php @@ -2,36 +2,46 @@ use \Ratchet\MessageComponentInterface; use \Ratchet\ConnectionInterface; -use \SciActive\RequirePHP as RequirePHP; +use \SciActive\RequirePHP; +use \WebSocket\Client as TextalkWebSocketClient; /** * Handle subscriptions and publications. */ -class MessageHandler extends MessageComponentInterface { +class MessageHandler implements MessageComponentInterface { + private $logger; protected $subscriptions = [ 'queries' => [], 'uids' => [] ]; + public function __construct(\Zend\Log\Logger $logger) { + $this->logger = $logger; + } + /** * Log users who join. * - * @param ConnectionInterface $user + * @param ConnectionInterface $conn */ - public function onOpen(ConnectionInterface $user) { - var_dump($user); - $this->logger->notice("Client joined the party! ({$user->getId()})"); + public function onOpen(ConnectionInterface $conn) { + // var_dump($conn); + $this->logger->notice("Client joined the party! ({$conn->resourceId})"); } /** * Handle a message from a client. * - * @param ConnectionInterface $user + * @param ConnectionInterface $from * @param string $msg */ - public function onMessage(ConnectionInterface $user, $msg) { + public function onMessage(ConnectionInterface $from, $msg) { $data = json_decode($msg, true); - if (!$data['action'] || !in_array($data['action'], ['subscribe', 'unsubscribe', 'publish'])) { + if (!$data['action'] + || !in_array( + $data['action'], + ['subscribe', 'unsubscribe', 'publish'] + )) { return; } switch ($data['action']) { @@ -62,20 +72,36 @@ public function onMessage(ConnectionInterface $user, $msg) { $guidArgs = $args; $guidArgs[0]['return'] = 'guid'; $this->subscriptions['queries'][$serialArgs] = [ - 'current' => call_user_func_array("\Nymph\Nymph::getEntities", $guidArgs) + 'current' => call_user_func_array( + "\Nymph\Nymph::getEntities", + $guidArgs + ) ]; } - $this->subscriptions['queries'][$serialArgs][] = ['client' => $user, 'query' => $data['query'], 'count' => !!$data['count']]; - $this->logger->notice("Client subscribed to a query! ($serialArgs, {$user->getId()})"); + $this->subscriptions['queries'][$serialArgs][] = + [ + 'client' => $from, + 'query' => $data['query'], + 'count' => !!$data['count'] + ]; + $this->logger->notice( + "Client subscribed to a query! " . + "($serialArgs, {$from->resourceId})" + ); if (RequirePHP::_('NymphPubSubConfig')['broadcast_counts']) { // Notify clients of the subscription count. $count = count($this->subscriptions['queries'][$serialArgs]) - 1; - foreach ($this->subscriptions['queries'][$serialArgs] as $key => $curClient) { + foreach ($this->subscriptions['queries'][$serialArgs] as + $key => $curClient) { if ($key === 'current') { continue; } if ($curClient['count']) { - $curClient['client']->sendString(json_encode(['query' => $curClient['query'], 'count' => $count])); + $curClient['client']->send( + json_encode( + ['query' => $curClient['query'], 'count' => $count] + ) + ); } } } @@ -83,22 +109,36 @@ public function onMessage(ConnectionInterface $user, $msg) { if (!key_exists($serialArgs, $this->subscriptions['queries'])) { return; } - foreach ($this->subscriptions['queries'][$serialArgs] as $key => $value) { + foreach ($this->subscriptions['queries'][$serialArgs] as + $key => $value) { if ($key === 'current') { continue; } - if ($user->getId() === $value['client']->getId() && $data['query'] === $value['query']) { + if ($from->resourceId === $value['client']->resourceId + && $data['query'] === $value['query']) { unset($this->subscriptions['queries'][$serialArgs][$key]); - $this->logger->notice("Client unsubscribed from a query! ($serialArgs, {$user->getId()})"); + $this->logger->notice( + "Client unsubscribed from a query! ". + "($serialArgs, {$from->resourceId})" + ); if (RequirePHP::_('NymphPubSubConfig')['broadcast_counts']) { // Notify clients of the subscription count. - $count = count($this->subscriptions['queries'][$serialArgs]) - 1; - foreach ($this->subscriptions['queries'][$serialArgs] as $key => $curClient) { + $count = + count($this->subscriptions['queries'][$serialArgs]) - 1; + foreach ($this->subscriptions['queries'][$serialArgs] as + $key => $curClient) { if ($key === 'current') { continue; } if ($curClient['count']) { - $curClient['client']->sendString(json_encode(['query' => $curClient['query'], 'count' => $count])); + $curClient['client']->send( + json_encode( + [ + 'query' => $curClient['query'], + 'count' => $count + ] + ) + ); } } } @@ -113,14 +153,23 @@ public function onMessage(ConnectionInterface $user, $msg) { if (!key_exists($data['uid'], $this->subscriptions['uids'])) { $this->subscriptions['uids'][$data['uid']] = []; } - $this->subscriptions['uids'][$data['uid']][] = ['client' => $user, 'count' => !!$data['count']]; - $this->logger->notice("Client subscribed to a UID! ({$data['uid']}, {$user->getId()})"); + $this->subscriptions['uids'][$data['uid']][] = + ['client' => $from, 'count' => !!$data['count']]; + $this->logger->notice( + "Client subscribed to a UID! " . + "({$data['uid']}, {$from->resourceId})" + ); if (RequirePHP::_('NymphPubSubConfig')['broadcast_counts']) { // Notify clients of the subscription count. $count = count($this->subscriptions['uids'][$data['uid']]); - foreach ($this->subscriptions['uids'][$data['uid']] as $curClient) { + foreach ($this->subscriptions['uids'][$data['uid']] as + $curClient) { if ($curClient['count']) { - $curClient['client']->sendString(json_encode(['uid' => $data['uid'], 'count' => $count])); + $curClient['client']->send( + json_encode( + ['uid' => $data['uid'], 'count' => $count] + ) + ); } } } @@ -128,16 +177,25 @@ public function onMessage(ConnectionInterface $user, $msg) { if (!key_exists($data['uid'], $this->subscriptions['uids'])) { return; } - foreach ($this->subscriptions['uids'][$data['uid']] as $key => $value) { - if ($user->getId() === $value['client']->getId()) { + foreach ($this->subscriptions['uids'][$data['uid']] as + $key => $value) { + if ($from->resourceId === $value['client']->resourceId) { unset($this->subscriptions['uids'][$data['uid']][$key]); - $this->logger->notice("Client unsubscribed from a UID! ({$data['uid']}, {$user->getId()})"); + $this->logger->notice( + "Client unsubscribed from a UID! " . + "({$data['uid']}, {$from->resourceId})" + ); if (RequirePHP::_('NymphPubSubConfig')['broadcast_counts']) { // Notify clients of the subscription count. $count = count($this->subscriptions['uids'][$data['uid']]); - foreach ($this->subscriptions['uids'][$data['uid']] as $curClient) { + foreach ($this->subscriptions['uids'][$data['uid']] as + $curClient) { if ($curClient['count']) { - $curClient['client']->sendString(json_encode(['uid' => $data['uid'], 'count' => $count])); + $curClient['client']->send( + json_encode( + ['uid' => $data['uid'], 'count' => $count] + ) + ); } } } @@ -151,34 +209,45 @@ public function onMessage(ConnectionInterface $user, $msg) { } break; case 'publish': - if ( - isset($data['guid']) && - ( - $data['event'] === 'delete' || - ( - isset($data['entity']) && - ($data['event'] === 'create' || $data['event'] === 'update') + if (isset($data['guid']) + && ( + $data['event'] === 'delete' + || ( + isset($data['entity']) + && ($data['event'] === 'create' || $data['event'] === 'update') ) - ) - ) { - $this->logger->notice("Received an entity publish! ({$data['guid']}, {$data['event']}, {$user->getId()})"); + )) { + $this->logger->notice( + "Received an entity publish! " . + "({$data['guid']}, {$data['event']}, {$from->resourceId})" + ); // Relay the publish to other servers. $this->relay($msg); - foreach ($this->subscriptions['queries'] as $curQuery => &$curClients) { + foreach ($this->subscriptions['queries'] as + $curQuery => &$curClients) { if ($data['event'] === 'delete' || $data['event'] === 'update') { // Check if it is in any queries' currents. if (in_array($data['guid'], $curClients['current'])) { // Update currents list. $guidArgs = unserialize($curQuery); $guidArgs[0]['return'] = 'guid'; - $curClients['current'] = call_user_func_array("\Nymph\Nymph::getEntities", $guidArgs); + $curClients['current'] = + call_user_func_array( + "\Nymph\Nymph::getEntities", + $guidArgs + ); // Notify subscribers. foreach ($curClients as $key => $curClient) { if ($key === 'current') { continue; } - $this->logger->notice("Notifying client of modification! ({$curClient['client']->getId()})"); - $curClient['client']->sendString(json_encode(['query' => $curClient['query']])); + $this->logger->notice( + "Notifying client of modification! " . + "({$curClient['client']->resourceId})" + ); + $curClient['client']->send( + json_encode(['query' => $curClient['query']]) + ); } continue; } @@ -194,11 +263,22 @@ public function onMessage(ConnectionInterface $user, $msg) { $entityData['mdate'] = $data['entity']['mdate']; $entitySData = []; - if ($options['class'] === $data['entity']['class'] && \Nymph\Nymph::checkData($entityData, $entitySData, $selectors, $data['guid'], $data['entity']['tags'])) { + if ($options['class'] === $data['entity']['class'] + && \Nymph\Nymph::checkData( + $entityData, + $entitySData, + $selectors, + $data['guid'], + $data['entity']['tags'] + )) { // Update currents list. $guidArgs = unserialize($curQuery); $guidArgs[0]['return'] = 'guid'; - $curClients['current'] = call_user_func_array("\Nymph\Nymph::getEntities", $guidArgs); + $curClients['current'] = + call_user_func_array( + "\Nymph\Nymph::getEntities", + $guidArgs + ); // If we're here, it means the query didn't // match the entity before, and now it does. We // could check currents to see if it's been @@ -210,24 +290,48 @@ public function onMessage(ConnectionInterface $user, $msg) { if ($key === 'current') { continue; } - $this->logger->notice("Notifying client of new match! ({$curClient['client']->getId()})"); - $curClient['client']->sendString(json_encode(['query' => $curClient['query']])); + $this->logger->notice( + "Notifying client of new match! " . + "({$curClient['client']->resourceId})" + ); + $curClient['client']->send( + json_encode(['query' => $curClient['query']]) + ); } } } } unset($curClients); - } elseif ((isset($data['name']) || (isset($data['oldName']) && isset($data['newName']))) && in_array($data['event'], ['newUID', 'setUID', 'renameUID', 'deleteUID'])) { - $this->logger->notice("Received a UID publish! (".(isset($data['name']) ? $data['name'] : "{$data['oldName']} => {$data['newName']}").", {$data['event']}, {$user->getId()})"); + } elseif (( + isset($data['name']) + || (isset($data['oldName']) && isset($data['newName'])) + ) + && in_array( + $data['event'], + ['newUID', 'setUID', 'renameUID', 'deleteUID'] + )) { + $this->logger->notice( + "Received a UID publish! (" . + ( + isset($data['name']) + ? $data['name'] + : "{$data['oldName']} => {$data['newName']}" + ) . + ", {$data['event']}, {$from->resourceId})" + ); // Relay the publish to other servers. $this->relay($msg); foreach ($data as $key => $name) { - if (!in_array($key, ['name', 'newName', 'oldName']) || !key_exists($name, $this->subscriptions['uids'])) { + if (!in_array($key, ['name', 'newName', 'oldName']) + || !key_exists($name, $this->subscriptions['uids'])) { continue; } foreach ($this->subscriptions['uids'][$name] as $curClient) { - $this->logger->notice("Notifying client of {$data['event']}! ($name, {$curClient['client']->getId()})"); - $curClient['client']->sendString(json_encode(['uid' => $name])); + $this->logger->notice( + "Notifying client of {$data['event']}! " . + "($name, {$curClient['client']->resourceId})" + ); + $curClient['client']->send(json_encode(['uid' => $name])); } } } @@ -238,10 +342,10 @@ public function onMessage(ConnectionInterface $user, $msg) { /** * Clean up after users who leave. * - * @param ConnectionInterface $user + * @param ConnectionInterface $conn */ - public function onClose(ConnectionInterface $user) { - $this->logger->notice("Client skedaddled. ({$user->getId()})"); + public function onClose(ConnectionInterface $conn) { + $this->logger->notice("Client skedaddled. ({$conn->resourceId})"); $mess = 0; foreach ($this->subscriptions['queries'] as $curQuery => &$curClients) { @@ -249,7 +353,7 @@ public function onClose(ConnectionInterface $user) { if ($key === 'current') { continue; } - if ($user->getId() === $curClient['client']->getId()) { + if ($conn->resourceId === $curClient['client']->resourceId) { unset($curClients[$key]); if (RequirePHP::_('NymphPubSubConfig')['broadcast_counts']) { // Notify clients of the subscription count. @@ -259,7 +363,14 @@ public function onClose(ConnectionInterface $user) { continue; } if ($curCountClient['count']) { - $curCountClient['client']->sendString(json_encode(['query' => $curCountClient['query'], 'count' => $count])); + $curCountClient['client']->send( + json_encode( + [ + 'query' => $curCountClient['query'], + 'count' => $count + ] + ) + ); } } } @@ -273,14 +384,16 @@ public function onClose(ConnectionInterface $user) { unset($curClients); foreach ($this->subscriptions['uids'] as $curUID => &$curClients) { foreach ($curClients as $key => $curClient) { - if ($user->getId() === $curClient['client']->getId()) { + if ($conn->resourceId === $curClient['client']->resourceId) { unset($curClients[$key]); if (RequirePHP::_('NymphPubSubConfig')['broadcast_counts']) { // Notify clients of the subscription count. $count = count($curClients); foreach ($curClients as $curCountClient) { if ($curCountClient['count']) { - $curCountClient['client']->sendString(json_encode(['uid' => $curUID, 'count' => $count])); + $curCountClient['client']->send( + json_encode(['uid' => $curUID, 'count' => $count]) + ); } } } @@ -294,11 +407,14 @@ public function onClose(ConnectionInterface $user) { unset($curClients); if ($mess) { - $this->logger->notice("Cleaned up client's mess. ($mess, {$user->getId()})"); + $this->logger->notice( + "Cleaned up client's mess. " . + "($mess, {$conn->resourceId})" + ); } } - public function onError(ConnectionInterface $user, \Exception $e) { + public function onError(ConnectionInterface $conn, \Exception $e) { $this->logger->error("An error occured. ({$e->getMessage()})"); } @@ -314,23 +430,9 @@ private function relay($message) { return; } - $loop = \React\EventLoop\Factory::create(); - - $logger = new \Zend\Log\Logger(); - $writer = new \Zend\Log\Writer\Stream("php://stderr"); - $logger->addWriter($writer); - foreach ($config['relays'] as $host) { - $client = new \Devristo\Phpws\Client\WebSocket($host, $loop, $logger); - - $client->on("connect", function () use ($message, $client) { - $client->send($message); - $client->close(); - }); - - $client->open(); + $client = new TextalkWebSocketClient($host); + $client->send($message); } - - $loop->run(); } } diff --git a/src/MessageHandlerForUnroutedUrls.php b/src/MessageHandlerForUnroutedUrls.php deleted file mode 100644 index 5149947..0000000 --- a/src/MessageHandlerForUnroutedUrls.php +++ /dev/null @@ -1,19 +0,0 @@ -logger->notice("Client doesn't know what he's doing. ({$user->getId()})"); - } - public function onMessage(WebSocketTransportInterface $user, WebSocketMessageInterface $msg) { - //do nothing - $this->logger->notice("Client is talking to the wind. ({$msg->getData()}, {$user->getId()})"); - } -} \ No newline at end of file diff --git a/src/Server.php b/src/Server.php index 44c1557..77034e2 100644 --- a/src/Server.php +++ b/src/Server.php @@ -6,10 +6,8 @@ use \SciActive\RequirePHP; class Server { - private $loop; private $logger; private $writer; - private $router; private $server; /** @@ -50,8 +48,6 @@ public function __construct($config = []) { self::configure($config); $config = RequirePHP::_('NymphPubSubConfig'); - // $this->loop = \React\EventLoop\Factory::create(); - // Create a logger which writes everything to the STDOUT $this->logger = new \Zend\Log\Logger(); $this->writer = new \Zend\Log\Writer\Stream("php://output"); @@ -72,37 +68,15 @@ public function __construct($config = []) { } throw $e; } - // $this->server = new WebSocketServer( - // "tcp://{$config['host']}:{$config['port']}", - // $this->loop, - // $this->logger - // ); - // - // $server->run(); - // - // // Create a router which transfers all /chat connections to the - // // MessageHandler class - // $this->router = new ClientRouter($this->server, $this->logger); - // - // // route / url - // $this->router->addRoute('#^/$#i', new MessageHandler($this->logger)); - // - // // route unmatched urls - // $this->router->addRoute( - // '#^(.*)$#i', - // new MessageHandlerForUnroutedUrls($this->logger) - // ); - // - // // Bind the server - // $this->server->bind(); $this->server = IoServer::factory( new HttpServer( new WsServer( - new MessageHandler() + new MessageHandler($this->logger) ) ), - $config['port'] + $config['port'], + $config['host'] ); } @@ -116,7 +90,6 @@ public function __destruct() { } throw $e; } - // $this->server->removeAllListeners(); $this->stop(); }