Skip to content

Commit

Permalink
初步完成 DEMO 修改
Browse files Browse the repository at this point in the history
  • Loading branch information
onanying committed Jul 26, 2018
1 parent 9d59255 commit 36557c0
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 148 deletions.
91 changes: 33 additions & 58 deletions apps/crontab/commands/AssemblyLineCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

use mix\console\ExitCode;
use mix\facades\Input;
use mix\task\CenterProcess;
use mix\task\LeftProcess;
use mix\task\RightProcess;
use mix\task\CenterWorker;
use mix\task\LeftWorker;
use mix\task\ProcessPoolTaskExecutor;
use mix\task\RightWorker;

/**
* 流水线模式范例
Expand Down Expand Up @@ -35,19 +35,19 @@ public function getTaskService()
// 类路径
'class' => 'mix\task\ProcessPoolTaskExecutor',
// 服务名称
'name' => "mix-crontab: {$this->programName}",
// 执行类型
'type' => \mix\task\ProcessPoolTaskExecutor::TYPE_CRONTAB,
'name' => "mix-daemon: {$this->programName}",
// 执行模式
'mode' => \mix\task\ProcessPoolTaskExecutor::MODE_ASSEMBLY_LINE,
'mode' => ProcessPoolTaskExecutor::MODE_ASSEMBLY_LINE,
// 左进程数
'leftProcess' => 1, // 定时任务类型,如果不为1,底层将自动调整为1
'leftProcess' => 1,
// 中进程数
'centerProcess' => 5,
// 右进程数
'rightProcess' => 1,
// 任务超时时间 (秒)
'timeout' => 5,
// 最大执行次数
'maxExecutions' => 16000,
// 队列名称
'queueName' => __FILE__,
]
);
}
Expand All @@ -60,74 +60,49 @@ public function actionExec()
// 启动服务
$service = $this->getTaskService();
$service->on('LeftStart', [$this, 'onLeftStart']);
$service->on('CenterStart', [$this, 'onCenterStart']);
$service->on('RightStart', [$this, 'onRightStart']);
$service->on('CenterMessage', [$this, 'onCenterMessage']);
$service->on('RightMessage', [$this, 'onRightMessage']);
$service->start();
// 返回退出码
return ExitCode::OK;
}

// 左进程启动事件回调函数
public function onLeftStart(LeftProcess $worker)
public function onLeftStart(LeftWorker $worker)
{
// 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
$tableModel = new \apps\common\models\TableModel();
// 取出数据一行一行推送给中进程
/*
foreach ($tableModel->getAll() as $item) {
// 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
$worker->push($item);
$worker->send($item);
}
*/

for ($i = 1; $i < 16000; $i++) {
$worker->send($i);
}

}

// 中进程启动事件回调函数
public function onCenterStart(CenterProcess $worker)
// 中进程消息事件回调函数
public function onCenterMessage(CenterWorker $worker, $data)
{
// 保持任务执行状态,定时任务只能使用 while (true) 保持执行状态
while (true) {
$data = $worker->pop();
if (empty($data)) {
continue;
}
try {
// 对消息进行处理,比如:IP转换,经纬度转换等
// ...
// 将处理完成的消息推送给右进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
$worker->push($data);
} catch (\Exception $e) {
// 回退数据到消息队列
$worker->rollback($data);
// 休息一会,避免 CPU 出现 100%
sleep(1);
// 抛出错误
throw $e;
}
}
// 对消息进行处理,比如:IP转换,经纬度转换等
// ...
// 将处理完成的消息推送给右进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
$worker->send($data);
usleep(10000);
}

// 右进程启动事件回调函数
public function onRightStart(RightProcess $worker)
public function onRightMessage(RightWorker $worker, $data)
{
// 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
$tableModel = new \apps\common\models\TableModel();
// 保持任务执行状态,定时任务只能使用 while (true) 保持执行状态
while (true) {
// 从进程队列中抢占一条消息
$data = $worker->pop();
if (empty($data)) {
continue;
}
try {
// 将处理完成的消息存入数据库
// ...
} catch (\Exception $e) {
// 回退数据到消息队列
$worker->rollback($data);
// 休息一会,避免 CPU 出现 100%
sleep(1);
// 抛出错误
throw $e;
}
}
// 将处理完成的消息存入数据库
// ...
var_dump($data);
usleep(1000);
}

}
59 changes: 25 additions & 34 deletions apps/crontab/commands/PushCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

use mix\console\ExitCode;
use mix\facades\Input;
use mix\task\CenterProcess;
use mix\task\LeftProcess;
use mix\task\CenterWorker;
use mix\task\LeftWorker;
use mix\task\ProcessPoolTaskExecutor;

/**
Expand Down Expand Up @@ -34,17 +34,17 @@ public function getTaskService()
// 类路径
'class' => 'mix\task\ProcessPoolTaskExecutor',
// 服务名称
'name' => "mix-crontab: {$this->programName}",
// 执行类型
'type' => \mix\task\ProcessPoolTaskExecutor::TYPE_CRONTAB,
'name' => "mix-daemon: {$this->programName}",
// 执行模式
'mode' => \mix\task\ProcessPoolTaskExecutor::MODE_PUSH,
'mode' => ProcessPoolTaskExecutor::MODE_PUSH,
// 左进程数
'leftProcess' => 1, // 定时任务类型,如果不为1,底层将自动调整为1
'leftProcess' => 1,
// 中进程数
'centerProcess' => 5,
// 任务超时时间 (秒)
'timeout' => 5,
// 最大执行次数
'maxExecutions' => 16000,
// 队列名称
'queueName' => __FILE__,
]
);
}
Expand All @@ -57,47 +57,38 @@ public function actionExec()
// 启动服务
$service = $this->getTaskService();
$service->on('LeftStart', [$this, 'onLeftStart']);
$service->on('CenterStart', [$this, 'onCenterStart']);
$service->on('CenterMessage', [$this, 'onCenterMessage']);
$service->start();
// 返回退出码
return ExitCode::OK;
}

// 左进程启动事件回调函数
public function onLeftStart(LeftProcess $worker)
public function onLeftStart(LeftWorker $worker)
{
// 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
$tableModel = new \apps\common\models\TableModel();
// 取出数据一行一行推送给中进程
/*
foreach ($tableModel->getAll() as $item) {
// 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
$worker->push($item);
$worker->send($item);
}
*/

for ($i = 1; $i < 16000; $i++) {
$worker->send($i);
}

}

// 中进程启动事件回调函数
public function onCenterStart(CenterProcess $worker)
// 中进程消息事件回调函数
public function onCenterMessage(CenterWorker $worker, $data)
{
// 保持任务执行状态,定时任务只能使用 while (true) 保持执行状态
while (true) {
// 从进程消息队列中抢占一条消息
$data = $worker->pop();
if (empty($data)) {
continue;
}
// 处理消息
try {
// 处理消息,比如:发送短信、发送邮件、微信推送
// ...
} catch (\Exception $e) {
// 回退数据到消息队列
$worker->rollback($data);
// 休息一会,避免 CPU 出现 100%
sleep(1);
// 抛出错误
throw $e;
}
}
// 处理消息,比如:发送短信、发送邮件、微信推送
// ...
var_dump($data);
usleep(1000);
}

}
15 changes: 7 additions & 8 deletions apps/daemon/commands/AssemblyLineCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
use mix\console\ExitCode;
use mix\facades\Input;
use mix\task\CenterWorker;
use mix\task\InputQueue;
use mix\task\LeftWorker;
use mix\task\OutputQueue;
use mix\task\RightWorker;
use mix\task\ProcessPoolTaskExecutor;

Expand Down Expand Up @@ -40,16 +38,16 @@ public function getTaskService()
'class' => 'mix\task\ProcessPoolTaskExecutor',
// 服务名称
'name' => "mix-daemon: {$this->programName}",
// 执行类型
'type' => \mix\task\ProcessPoolTaskExecutor::TYPE_DAEMON,
// 执行模式
'mode' => \mix\task\ProcessPoolTaskExecutor::MODE_ASSEMBLY_LINE,
'mode' => ProcessPoolTaskExecutor::MODE_ASSEMBLY_LINE | ProcessPoolTaskExecutor::MODE_DAEMON,
// 左进程数
'leftProcess' => 1,
// 中进程数
'centerProcess' => 5,
// 右进程数
'rightProcess' => 1,
// 最大执行次数
'maxExecutions' => 16000,
// 队列名称
'queueName' => __FILE__,
]
Expand Down Expand Up @@ -78,12 +76,13 @@ public function onLeftStart(LeftWorker $worker)
{
// 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
$queueModel = new \apps\common\models\QueueModel();
// 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
for ($j = 1; $j < 16000; $j++) {
// 通过循环保持任务执行状态
while (true) {
static $i = 1;
// 从消息队列中间件阻塞获取一条消息
//$data = $queueModel->pop();
// 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
$worker->send($j);
$worker->send($i++);
usleep(1000);
}
}
Expand Down
73 changes: 25 additions & 48 deletions apps/daemon/commands/PushCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

use mix\console\ExitCode;
use mix\facades\Input;
use mix\task\CenterProcess;
use mix\task\LeftProcess;
use mix\task\CenterWorker;
use mix\task\LeftWorker;
use mix\task\ProcessPoolTaskExecutor;

/**
Expand Down Expand Up @@ -37,16 +37,16 @@ public function getTaskService()
'class' => 'mix\task\ProcessPoolTaskExecutor',
// 服务名称
'name' => "mix-daemon: {$this->programName}",
// 执行类型
'type' => \mix\task\ProcessPoolTaskExecutor::TYPE_DAEMON,
// 执行模式
'mode' => \mix\task\ProcessPoolTaskExecutor::MODE_PUSH,
'mode' => ProcessPoolTaskExecutor::MODE_PUSH | ProcessPoolTaskExecutor::MODE_DAEMON,
// 左进程数
'leftProcess' => 1,
// 中进程数
'centerProcess' => 5,
// 任务超时时间 (秒)
'timeout' => 5,
// 最大执行次数
'maxExecutions' => 16000,
// 队列名称
'queueName' => __FILE__,
]
);
}
Expand All @@ -61,58 +61,35 @@ public function actionStart()
// 启动服务
$service = $this->getTaskService();
$service->on('LeftStart', [$this, 'onLeftStart']);
$service->on('CenterStart', [$this, 'onCenterStart']);
$service->on('CenterMessage', [$this, 'onCenterMessage']);
$service->start();
// 返回退出码
return ExitCode::OK;
}

// 左进程启动事件回调函数
public function onLeftStart(LeftProcess $worker)
public function onLeftStart(LeftWorker $worker)
{
try {
// 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
$queueModel = new \apps\common\models\QueueModel();
// 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
for ($j = 0; $j < 16000; $j++) {
// 从消息队列中间件阻塞获取一条消息
$data = $queueModel->pop();
// 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
$worker->push($j+1);
sleep(10);
}
} catch (\Exception $e) {
// 休息一会,避免 CPU 出现 100%
sleep(1);
// 抛出错误
throw $e;
// 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
$queueModel = new \apps\common\models\QueueModel();
// 通过循环保持任务执行状态
while (true) {
static $i = 1;
// 从消息队列中间件阻塞获取一条消息
//$data = $queueModel->pop();
// 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
$worker->send($i++);
usleep(1000);
}
}

// 中进程启动事件回调函数
public function onCenterStart(CenterProcess $worker)
// 中进程消息事件回调函数
public function onCenterMessage(CenterWorker $worker, $data)
{
// 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
for ($j = 0; $j < 16000; $j++) {
// 从进程消息队列中抢占一条消息
$data = $worker->pop();
if (empty($data)) {
continue;
}
// 处理消息
try {
// 处理消息,比如:发送短信、发送邮件、微信推送
// ...
var_dump($data);
} catch (\Exception $e) {
// 回退数据到消息队列
$worker->rollback($data);
// 休息一会,避免 CPU 出现 100%
sleep(1);
// 抛出错误
throw $e;
}
}
// 处理消息,比如:发送短信、发送邮件、微信推送
// ...
var_dump($data);
usleep(10000);
}

}

0 comments on commit 36557c0

Please sign in to comment.