diff --git a/src/Eventor/EventStreamCursor.php b/src/Eventor/EventStreamCursor.php index c34ea31..f39ff96 100644 --- a/src/Eventor/EventStreamCursor.php +++ b/src/Eventor/EventStreamCursor.php @@ -7,21 +7,23 @@ class EventStreamCursor implements EventStreamCursorInterface { protected $redis; + protected $id; const HASH_NAME = 'eventstream_cursors'; - public function __construct(ClientInterface $redis) + public function __construct($id, ClientInterface $redis) { $this->redis = $redis; + $this->id = $id; } - public function fetch($stream) + public function fetch() { - return max($this->redis->hget(self::HASH_NAME, $stream), -1); + return max($this->redis->hget(self::HASH_NAME, $this->id), -1); } - public function increment($stream) + public function increment() { - return $this->redis->hincrby(self::HASH_NAME, $stream, 1); + return $this->redis->hincrby(self::HASH_NAME, $this->id, 1); } } \ No newline at end of file diff --git a/src/Eventor/EventStreamCursorInterface.php b/src/Eventor/EventStreamCursorInterface.php index 34c0a0f..9205101 100644 --- a/src/Eventor/EventStreamCursorInterface.php +++ b/src/Eventor/EventStreamCursorInterface.php @@ -4,6 +4,6 @@ interface EventStreamCursorInterface { - public function fetch($stream); - public function increment($stream); + public function fetch(); + public function increment(); } \ No newline at end of file diff --git a/src/Eventor/ServiceActivator.php b/src/Eventor/ServiceActivator.php index 769e160..0767813 100644 --- a/src/Eventor/ServiceActivator.php +++ b/src/Eventor/ServiceActivator.php @@ -15,8 +15,6 @@ public function addActivator($eventName, callable $a) public function traverseStream(EventStreamInterface $s, EventStreamCursorInterface $cursor) { foreach ($s as $event) { - $cursor->increment($s->getName()); - if (empty($this->activators[$event->getType()])) continue; try { @@ -26,6 +24,8 @@ public function traverseStream(EventStreamInterface $s, EventStreamCursorInterfa // @todo do something with exceptions here var_dump($e->getMessage()); } + + $cursor->increment(); } }