Skip to content

Commit

Permalink
Update StreamSelectLoop.php
Browse files Browse the repository at this point in the history
  • Loading branch information
walkor authored Apr 11, 2018
1 parent c697350 commit ab3cad9
Showing 1 changed file with 3 additions and 163 deletions.
166 changes: 3 additions & 163 deletions Events/React/StreamSelectLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,175 +12,15 @@
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Workerman\Events\React;
use Workerman\Events\EventInterface;

/**
* Class StreamSelectLoop
* @package Workerman\Events\React
*/
class StreamSelectLoop extends \React\EventLoop\StreamSelectLoop
class StreamSelectLoop extends Base
{
/**
* @var array
*/
protected $_timerIdMap = array();

/**
* @var int
*/
protected $_timerIdIndex = 0;

/**
* Add event listener to event loop.
*
* @param $fd
* @param $flag
* @param $func
* @param array $args
* @return bool
*/
public function add($fd, $flag, $func, $args = array())
{
$args = (array)$args;
switch ($flag) {
case EventInterface::EV_READ:
return $this->addReadStream($fd, $func);
case EventInterface::EV_WRITE:
return $this->addWriteStream($fd, $func);
case EventInterface::EV_SIGNAL:
return $this->addSignal($fd, $func);
case EventInterface::EV_TIMER:
$timer_obj = $this->addPeriodicTimer($fd, function() use ($func, $args) {
call_user_func_array($func, $args);
});
$this->_timerIdMap[++$this->_timerIdIndex] = $timer_obj;
return $this->_timerIdIndex;
case EventInterface::EV_TIMER_ONCE:
$index = ++$this->_timerIdIndex;
$timer_obj = $this->addTimer($fd, function() use ($func, $args, $index) {
$this->del($index,EventInterface::EV_TIMER_ONCE);
call_user_func_array($func, $args);
});
$this->_timerIdMap[$index] = $timer_obj;
return $this->_timerIdIndex;
}
return false;
}

/**
* Remove event listener from event loop.
*
* @param mixed $fd
* @param int $flag
* @return bool
*/
public function del($fd, $flag)
{
switch ($flag) {
case EventInterface::EV_READ:
return $this->removeReadStream($fd);
case EventInterface::EV_WRITE:
return $this->removeWriteStream($fd);
case EventInterface::EV_SIGNAL:
return $this->removeSignal($fd);
case EventInterface::EV_TIMER:
case EventInterface::EV_TIMER_ONCE:
if (isset($this->_timerIdMap[$fd])){
$timer_obj = $this->_timerIdMap[$fd];
unset($this->_timerIdMap[$fd]);
$this->cancelTimer($timer_obj);
return true;
}
}
return false;
}


/**
* Main loop.
*
* @return void
*/
public function loop()
{
$this->run();
}

/**
* Add signal handler.
*
* @param $signal
* @param $callback
* @return bool
*/
public function addSignal($signal, $callback)
{
if(DIRECTORY_SEPARATOR === '/') {
pcntl_signal($signal, $callback);
}
}

/**
* Remove signal handler.
*
* @param $signal
*/
public function removeSignal($signal)
{
if(DIRECTORY_SEPARATOR === '/') {
pcntl_signal($signal, SIG_IGN);
}
}

/**
* Emulate a stream_select() implementation that does not break when passed
* empty stream arrays.
*
* @param array &$read An array of read streams to select upon.
* @param array &$write An array of write streams to select upon.
* @param integer|null $timeout Activity timeout in microseconds, or null to wait forever.
*
* @return integer|false The total number of streams that are ready for read/write.
* Can return false if stream_select() is interrupted by a signal.
*/
protected function streamSelect(array &$read, array &$write, $timeout)
{
if ($read || $write) {
$except = null;
// Calls signal handlers for pending signals
if(DIRECTORY_SEPARATOR === '/') {
pcntl_signal_dispatch();
}
// suppress warnings that occur, when stream_select is interrupted by a signal
return @stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout);
}

// Calls signal handlers for pending signals
if(DIRECTORY_SEPARATOR === '/') {
pcntl_signal_dispatch();
}
$timeout && usleep($timeout);

return 0;
}

/**
* Destroy loop.
*
* @return void
*/
public function destroy()
{

}

/**
* Get timer count.
*
* @return integer
*/
public function getTimerCount()
public function __construct()
{
return count($this->_timerIdMap);
$this->_eventLoop = new \React\EventLoop\StreamSelectLoop();
}
}

0 comments on commit ab3cad9

Please sign in to comment.