Skip to content
This repository has been archived by the owner on Apr 4, 2019. It is now read-only.

Commit

Permalink
adds support for named cursors
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-haproff committed Jul 29, 2016
1 parent 0913269 commit 851302d
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 9 deletions.
12 changes: 7 additions & 5 deletions src/Eventor/EventStreamCursor.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,23 @@
class EventStreamCursor implements EventStreamCursorInterface
{
protected $redis;
protected $id;

const HASH_NAME = 'eventstream_cursors';

public function __construct(ClientInterface $redis)
public function __construct($id, ClientInterface $redis)
{
$this->redis = $redis;
$this->id = $id;
}

public function fetch($stream)
public function fetch()
{
return max($this->redis->hget(self::HASH_NAME, $stream), -1);
return max($this->redis->hget(self::HASH_NAME, $this->id), -1);
}

public function increment($stream)
public function increment()
{
return $this->redis->hincrby(self::HASH_NAME, $stream, 1);
return $this->redis->hincrby(self::HASH_NAME, $this->id, 1);
}
}
4 changes: 2 additions & 2 deletions src/Eventor/EventStreamCursorInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

interface EventStreamCursorInterface
{
public function fetch($stream);
public function increment($stream);
public function fetch();
public function increment();
}
4 changes: 2 additions & 2 deletions src/Eventor/ServiceActivator.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ public function addActivator($eventName, callable $a)
public function traverseStream(EventStreamInterface $s, EventStreamCursorInterface $cursor)
{
foreach ($s as $event) {
$cursor->increment($s->getName());

if (empty($this->activators[$event->getType()])) continue;

try {
Expand All @@ -26,6 +24,8 @@ public function traverseStream(EventStreamInterface $s, EventStreamCursorInterfa
// @todo do something with exceptions here
var_dump($e->getMessage());
}

$cursor->increment();
}
}

Expand Down

0 comments on commit 851302d

Please sign in to comment.