diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d66edda --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.idea +.DS_Store +vendor diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..ab51401 --- /dev/null +++ b/composer.json @@ -0,0 +1,19 @@ +{ + "name": "mannum/eventor", + "description": "Simple event-sourcing framework for EventStore.", + "keywords": ["event-sourcing"], + + "authors": [ + { + "name": "Alex Haproff", + "email": "alex@haproff.com" + } + ], + "require": { + "predis/predis": "^1.0", + "guzzlehttp/guzzle": "^6.2" + }, + "autoload": { + "psr-4": {"Eventor\\": "src/Eventor"} + } +} diff --git a/composer.lock b/composer.lock new file mode 100644 index 0000000..18de621 --- /dev/null +++ b/composer.lock @@ -0,0 +1,289 @@ +{ + "_readme": [ + "This file locks the dependencies of your project to a known state", + "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file", + "This file is @generated automatically" + ], + "hash": "68a2a7f617dc1a38d76e49b26d2eda67", + "content-hash": "1fabd32c5d436d93946230be43d829c5", + "packages": [ + { + "name": "guzzlehttp/guzzle", + "version": "6.2.0", + "source": { + "type": "git", + "url": "https://github.com/guzzle/guzzle.git", + "reference": "d094e337976dff9d8e2424e8485872194e768662" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/guzzle/guzzle/zipball/d094e337976dff9d8e2424e8485872194e768662", + "reference": "d094e337976dff9d8e2424e8485872194e768662", + "shasum": "" + }, + "require": { + "guzzlehttp/promises": "~1.0", + "guzzlehttp/psr7": "~1.1", + "php": ">=5.5.0" + }, + "require-dev": { + "ext-curl": "*", + "phpunit/phpunit": "~4.0", + "psr/log": "~1.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "6.2-dev" + } + }, + "autoload": { + "files": [ + "src/functions_include.php" + ], + "psr-4": { + "GuzzleHttp\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Michael Dowling", + "email": "mtdowling@gmail.com", + "homepage": "https://github.com/mtdowling" + } + ], + "description": "Guzzle is a PHP HTTP client library", + "homepage": "http://guzzlephp.org/", + "keywords": [ + "client", + "curl", + "framework", + "http", + "http client", + "rest", + "web service" + ], + "time": "2016-03-21 20:02:09" + }, + { + "name": "guzzlehttp/promises", + "version": "1.1.0", + "source": { + "type": "git", + "url": "https://github.com/guzzle/promises.git", + "reference": "bb9024c526b22f3fe6ae55a561fd70653d470aa8" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/guzzle/promises/zipball/bb9024c526b22f3fe6ae55a561fd70653d470aa8", + "reference": "bb9024c526b22f3fe6ae55a561fd70653d470aa8", + "shasum": "" + }, + "require": { + "php": ">=5.5.0" + }, + "require-dev": { + "phpunit/phpunit": "~4.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0-dev" + } + }, + "autoload": { + "psr-4": { + "GuzzleHttp\\Promise\\": "src/" + }, + "files": [ + "src/functions_include.php" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Michael Dowling", + "email": "mtdowling@gmail.com", + "homepage": "https://github.com/mtdowling" + } + ], + "description": "Guzzle promises library", + "keywords": [ + "promise" + ], + "time": "2016-03-08 01:15:46" + }, + { + "name": "guzzlehttp/psr7", + "version": "1.3.0", + "source": { + "type": "git", + "url": "https://github.com/guzzle/psr7.git", + "reference": "31382fef2889136415751badebbd1cb022a4ed72" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/guzzle/psr7/zipball/31382fef2889136415751badebbd1cb022a4ed72", + "reference": "31382fef2889136415751badebbd1cb022a4ed72", + "shasum": "" + }, + "require": { + "php": ">=5.4.0", + "psr/http-message": "~1.0" + }, + "provide": { + "psr/http-message-implementation": "1.0" + }, + "require-dev": { + "phpunit/phpunit": "~4.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0-dev" + } + }, + "autoload": { + "psr-4": { + "GuzzleHttp\\Psr7\\": "src/" + }, + "files": [ + "src/functions_include.php" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Michael Dowling", + "email": "mtdowling@gmail.com", + "homepage": "https://github.com/mtdowling" + } + ], + "description": "PSR-7 message implementation", + "keywords": [ + "http", + "message", + "stream", + "uri" + ], + "time": "2016-04-13 19:56:01" + }, + { + "name": "predis/predis", + "version": "v1.0.3", + "source": { + "type": "git", + "url": "https://github.com/nrk/predis.git", + "reference": "84060b9034d756b4d79641667d7f9efe1aeb8e04" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/nrk/predis/zipball/84060b9034d756b4d79641667d7f9efe1aeb8e04", + "reference": "84060b9034d756b4d79641667d7f9efe1aeb8e04", + "shasum": "" + }, + "require": { + "php": ">=5.3.2" + }, + "require-dev": { + "phpunit/phpunit": "~4.0" + }, + "suggest": { + "ext-curl": "Allows access to Webdis when paired with phpiredis", + "ext-phpiredis": "Allows faster serialization and deserialization of the Redis protocol" + }, + "type": "library", + "autoload": { + "psr-4": { + "Predis\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Daniele Alessandri", + "email": "suppakilla@gmail.com", + "homepage": "http://clorophilla.net" + } + ], + "description": "Flexible and feature-complete PHP client library for Redis", + "homepage": "http://github.com/nrk/predis", + "keywords": [ + "nosql", + "predis", + "redis" + ], + "time": "2015-07-30 18:34:15" + }, + { + "name": "psr/http-message", + "version": "1.0", + "source": { + "type": "git", + "url": "https://github.com/php-fig/http-message.git", + "reference": "85d63699f0dbedb190bbd4b0d2b9dc707ea4c298" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-fig/http-message/zipball/85d63699f0dbedb190bbd4b0d2b9dc707ea4c298", + "reference": "85d63699f0dbedb190bbd4b0d2b9dc707ea4c298", + "shasum": "" + }, + "require": { + "php": ">=5.3.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0.x-dev" + } + }, + "autoload": { + "psr-4": { + "Psr\\Http\\Message\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "PHP-FIG", + "homepage": "http://www.php-fig.org/" + } + ], + "description": "Common interface for HTTP messages", + "keywords": [ + "http", + "http-message", + "psr", + "psr-7", + "request", + "response" + ], + "time": "2015-05-04 20:22:00" + } + ], + "packages-dev": [], + "aliases": [], + "minimum-stability": "stable", + "stability-flags": [], + "prefer-stable": false, + "prefer-lowest": false, + "platform": [], + "platform-dev": [] +} diff --git a/src/Eventor/CommandBus.php b/src/Eventor/CommandBus.php new file mode 100644 index 0000000..98928a9 --- /dev/null +++ b/src/Eventor/CommandBus.php @@ -0,0 +1,22 @@ +commandHandlers[$messageName] = $handler; + } + + public function handle($message) + { + if (empty($message::$name)) return; + + if (empty($this->commandHandlers[$message::$name])) return; + + $this->commandHandlers[$message::$name]($message); + } +} \ No newline at end of file diff --git a/src/Eventor/CommandBusInterface.php b/src/Eventor/CommandBusInterface.php new file mode 100644 index 0000000..28a7943 --- /dev/null +++ b/src/Eventor/CommandBusInterface.php @@ -0,0 +1,12 @@ +type = $type; + $this->streamName = $streamName; + $this->id = $id; + + if ( ! $this->id) { + $this->id = $this->uuid(); + } + + $this->setData($data); + } + + public function getStreamName() + { + return $this->streamName; + } + + public function getId() + { + return $this->id; + } + + public function getData() + { + return $this->data; + } + + public function setData(array $d) + { + $this->data = $d; + } + + public function getType() + { + return $this->type; + } + + public function toJson() + { + return json_encode([ + 'eventType' => $this->type, + 'streamName' => $this->streamName, + 'eventId' => $this->id, + 'data' => $this->data + ]); + } + + public static function fromJson($j) + { + $e = json_decode($j, true); + + if (empty($e['eventType']) || empty($e['streamName']) || empty($e['eventId']) || empty($e['data'])) { + throw new \InvalidArgumentException(sprintf('Invalid JSON supplied, unable to convert to event, data supplied [%s]', $e)); + } + + return new self($e['eventType'], $e['streamName'], $e['data'], $e['eventId']); + } + + private function uuid() + { + $data = openssl_random_pseudo_bytes(16); + $data[6] = chr(ord($data[6]) & 0x0f | 0x40); // set version to 0100 + $data[8] = chr(ord($data[8]) & 0x3f | 0x80); // set bits 6-7 to 10 + return vsprintf('%s%s-%s-%s-%s-%s%s%s', str_split(bin2hex($data), 4)); + } +} \ No newline at end of file diff --git a/src/Eventor/EventInterface.php b/src/Eventor/EventInterface.php new file mode 100644 index 0000000..360c8a7 --- /dev/null +++ b/src/Eventor/EventInterface.php @@ -0,0 +1,18 @@ +url = $url; + } + + public function getUrl() + { + return $this->url; + } + + public function getStreamUrl($s) + { + return sprintf('%s/streams/%s', $this->url, $s); + } + + public function getStreamForwardUrl($stream, $startPosition, $pageSize) + { + return sprintf('%s/%s/forward/%s', $this->getStreamUrl($stream), $startPosition, $pageSize); + } + + public function readEvents($url) + { + $client = $this->createHttpClient(); + + $response = $client->get($url, [ + 'headers' => [ + 'Accept' => 'application/vnd.eventstore.atom+json' + ] + ]); + + return json_decode($response->getBody(), true); + } + + public function writeEvent(EventInterface $e) + { + $client = $this->createHttpClient(); + + $client->post($this->getStreamUrl($e->getStreamName()), [ + 'headers' => [ + 'Content-Type' => 'application/json', + 'ES-ExpectedVersion' => '-2', + 'ES-EventType' => $e->getType(), + 'ES-EventId' => $e->getId() + ], + 'body' => json_encode($e->getData()) + ]); + } + + private function createHttpClient() + { + $stack = HandlerStack::create(new CurlHandler()); + $stack->push(Middleware::retry($this->createRetryHandler(), function($retries) { + return 1000 * intval(pow(2, $retries - 1)); + })); + return new HttpClient([ + 'handler' => $stack, + ]); + } + + private function createRetryHandler() + { + return function ( + $retries, + Psr7Request $request, + Psr7Response $response = null, + RequestException $exception = null + ) { + if ($retries >= 3) { + return false; + } + + if ( ! ($this->isServerError($response) || $this->isConnectError($exception))) { + return false; + } + + return true; + }; + } + + private function isServerError(Psr7Response $response = null) + { + return $response && $response->getStatusCode() >= 500; + } + + private function isConnectError(RequestException $exception = null) + { + return $exception instanceof ConnectException; + } +} \ No newline at end of file diff --git a/src/Eventor/EventStoreConnectionInterface.php b/src/Eventor/EventStoreConnectionInterface.php new file mode 100644 index 0000000..e54e8fd --- /dev/null +++ b/src/Eventor/EventStoreConnectionInterface.php @@ -0,0 +1,14 @@ +es = $es; + $this->name = trim($name, " \t\n\r\0\x0B/"); + $this->lastProcessedEventId = is_null($lastProcessedEventId) ? -1 : (int) $lastProcessedEventId; + $this->batchSize = (int) $batchSize; + $this->pageSize = max((int) $pageSize, 1); // we request from ES at last one event per page request + } + + public function getName() + { + return $this->name; + } + + public function current() + { + $this->numProcessedEntries++; + + return current($this->events); + } + + public function next() + { + return next($this->events); + } + + public function key() + { + return key($this->events); + } + + public function valid() + { + // do we have more entries? lets find out, if not lets try previous page + $navigateToPreviousPage = key($this->events) === null; + $reachedBatchSizeLimit = ($this->batchSize && $this->numProcessedEntries === $this->batchSize); + + if ($reachedBatchSizeLimit) return; + + if ($navigateToPreviousPage && $this->nextPageUrl) { + $this->fetchEvents($this->nextPageUrl); + } + + return key($this->events) !== null; + } + + public function rewind() + { + $this->numProcessedEntries = 0; + $this->nextPageUrl = null; + + // we should open the feed here and get the first entries + $this->fetchEvents($this->es->getStreamForwardUrl($this->name, $this->lastProcessedEventId + 1, $this->pageSize)); + + return reset($this->events); + } + + private function fetchEvents($url) + { + $url .= '?embed=body'; + + $feed = $this->es->readEvents($url); + + if (empty($feed)) return; + + $this->extractNextPageUrl($feed); + + $this->events = []; + + foreach (array_reverse($feed['entries']) as $entry) { + $this->events[] = new Event( + $entry['eventType'], + $this->name, + json_decode($entry['data'], true), + $entry['eventId'] + ); + } + } + + private function extractNextPageUrl($feed) + { + $this->nextPageUrl = null; + + if (empty($feed['links'])) return; + + foreach ($feed['links'] as $link) { + if (empty($link['uri']) || empty($link['relation'])) continue; + + if ($link['relation'] === 'previous') { + $this->nextPageUrl = $link['uri']; + break; + } + } + } +} \ No newline at end of file diff --git a/src/Eventor/EventStreamCursor.php b/src/Eventor/EventStreamCursor.php new file mode 100644 index 0000000..c34ea31 --- /dev/null +++ b/src/Eventor/EventStreamCursor.php @@ -0,0 +1,27 @@ +redis = $redis; + } + + public function fetch($stream) + { + return max($this->redis->hget(self::HASH_NAME, $stream), -1); + } + + public function increment($stream) + { + return $this->redis->hincrby(self::HASH_NAME, $stream, 1); + } +} \ No newline at end of file diff --git a/src/Eventor/EventStreamCursorInterface.php b/src/Eventor/EventStreamCursorInterface.php new file mode 100644 index 0000000..34c0a0f --- /dev/null +++ b/src/Eventor/EventStreamCursorInterface.php @@ -0,0 +1,9 @@ +redisClient = $redisClient; + $this->queueName = $queueName; + $this->esConnection = $esConnection; + } + + public function raise(EventInterface $e) + { + $this->redisClient->rpush($this->queueName, [$e->toJson()]); + } + + public function publishToEventStore() + { + while (($eventJson = $this->redisClient->lpop($this->queueName)) !== null) { + $event = Event::fromJson($eventJson); + + try { + $this->esConnection->writeEvent($event); + } catch (\InvalidArgumentException $e) { + // @todo log exception here + // just skip, do nothing here, maybe push into invalid events queue + continue; + } catch (\Exception $e) { + // @todo and another exception + // we have a fatal error, lets insert the event back into the queue and exit + $this->redisClient->lpush($this->queueName, [$event->toJson()]); + break; + } + } + } +} \ No newline at end of file diff --git a/src/Eventor/ServiceActivator.php b/src/Eventor/ServiceActivator.php new file mode 100644 index 0000000..769e160 --- /dev/null +++ b/src/Eventor/ServiceActivator.php @@ -0,0 +1,41 @@ +activators[$eventName] = $a; + } + + public function traverseStream(EventStreamInterface $s, EventStreamCursorInterface $cursor) + { + foreach ($s as $event) { + $cursor->increment($s->getName()); + + if (empty($this->activators[$event->getType()])) continue; + + try { + $command = $this->activators[$event->getType()]($event->getData()); + $this->bus->handle($command); + } catch (\Exception $e) { + // @todo do something with exceptions here + var_dump($e->getMessage()); + } + } + } + + public function getBus() + { + return $this->bus; + } + + public function setBus(CommandBusInterface $b) + { + $this->bus = $b; + } +} \ No newline at end of file diff --git a/src/Eventor/ServiceActivatorInterface.php b/src/Eventor/ServiceActivatorInterface.php new file mode 100644 index 0000000..ae1b84f --- /dev/null +++ b/src/Eventor/ServiceActivatorInterface.php @@ -0,0 +1,13 @@ +