Skip to content

Commit

Permalink
完善 task demo
Browse files Browse the repository at this point in the history
  • Loading branch information
onanying committed Jul 26, 2018
1 parent 36557c0 commit 57dc0e6
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 42 deletions.
20 changes: 6 additions & 14 deletions apps/crontab/commands/AssemblyLineCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,37 +72,29 @@ public function onLeftStart(LeftWorker $worker)
{
// 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
$tableModel = new \apps\common\models\TableModel();
// 取出数据一行一行推送给中进程
/*
// 取出全量数据一行一行推送给中进程去处理
foreach ($tableModel->getAll() as $item) {
// 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
// 将消息发送给中进程去处理,消息有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
$worker->send($item);
}
*/

for ($i = 1; $i < 16000; $i++) {
$worker->send($i);
}

}

// 中进程消息事件回调函数
public function onCenterMessage(CenterWorker $worker, $data)
{
// 对消息进行处理,比如:IP转换,经纬度转换等
// ...
// 将处理完成的消息推送给右进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
// 将处理完成的消息发送给右进程去处理,消息有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
$worker->send($data);
usleep(10000);
}

// 右进程启动事件回调函数
public function onRightMessage(RightWorker $worker, $data)
{
// 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
$tableModel = new \apps\common\models\TableModel();
// 将处理完成的消息存入数据库
// ...
var_dump($data);
usleep(1000);
$tableModel->insert($data);
}

}
13 changes: 2 additions & 11 deletions apps/crontab/commands/PushCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,27 +68,18 @@ public function onLeftStart(LeftWorker $worker)
{
// 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
$tableModel = new \apps\common\models\TableModel();
// 取出数据一行一行推送给中进程
/*
// 取出全量数据一行一行推送给中进程去处理
foreach ($tableModel->getAll() as $item) {
// 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
// 将消息发送给中进程去处理,消息有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
$worker->send($item);
}
*/

for ($i = 1; $i < 16000; $i++) {
$worker->send($i);
}

}

// 中进程消息事件回调函数
public function onCenterMessage(CenterWorker $worker, $data)
{
// 处理消息,比如:发送短信、发送邮件、微信推送
// ...
var_dump($data);
usleep(1000);
}

}
21 changes: 11 additions & 10 deletions apps/daemon/commands/AssemblyLineCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,14 @@ public function onLeftStart(LeftWorker $worker)
$queueModel = new \apps\common\models\QueueModel();
// 通过循环保持任务执行状态
while (true) {
static $i = 1;
// 从消息队列中间件阻塞获取一条消息
//$data = $queueModel->pop();
// 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
$worker->send($i++);
usleep(1000);
$data = $queueModel->pop();
/**
* 将消息发送给中进程去处理,消息有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
* 发送方法内有信号判断处理,当接收到重启、停止信号会立即退出左进程
* 当发送的数据为空时,并不会触发 onCenterMessage,但可以触发信号判断处理,所以当 pop 为空时,请照常 send 给中进程。
*/
$worker->send($data);
}
}

Expand All @@ -92,18 +94,17 @@ public function onCenterMessage(CenterWorker $worker, $data)
{
// 对消息进行处理,比如:IP转换,经纬度转换等
// ...
// 将处理完成的消息推送给右进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
// 将处理完成的消息发送给右进程去处理,消息有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
$worker->send($data);
usleep(10000);
}

// 右进程启动事件回调函数
public function onRightMessage(RightWorker $worker, $data)
{
// 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
$tableModel = new \apps\common\models\TableModel();
// 将处理完成的消息存入数据库
// ...
var_dump($data);
usleep(1000);
$tableModel->insert($data);
}

}
14 changes: 7 additions & 7 deletions apps/daemon/commands/PushCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,14 @@ public function onLeftStart(LeftWorker $worker)
$queueModel = new \apps\common\models\QueueModel();
// 通过循环保持任务执行状态
while (true) {
static $i = 1;
// 从消息队列中间件阻塞获取一条消息
//$data = $queueModel->pop();
// 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
$worker->send($i++);
usleep(1000);
$data = $queueModel->pop();
/**
* 将消息发送给中进程去处理,消息有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
* 发送方法内有信号判断处理,当接收到重启、停止信号会立即退出左进程
* 当发送的数据为空时,并不会触发 onCenterMessage,但可以触发信号判断处理,所以当 pop 为空时,请照常 send 给中进程。
*/
$worker->send($data);
}
}

Expand All @@ -88,8 +90,6 @@ public function onCenterMessage(CenterWorker $worker, $data)
{
// 处理消息,比如:发送短信、发送邮件、微信推送
// ...
var_dump($data);
usleep(10000);
}

}

0 comments on commit 57dc0e6

Please sign in to comment.