Skip to content

Commit

Permalink
再次重构Task, 未完成
Browse files Browse the repository at this point in the history
  • Loading branch information
onanying committed Jul 20, 2018
1 parent 77fa238 commit 9d59255
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 88 deletions.
10 changes: 5 additions & 5 deletions apps/crontab/commands/AssemblyLineCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use mix\task\CenterProcess;
use mix\task\LeftProcess;
use mix\task\RightProcess;
use mix\task\TaskExecutor;
use mix\task\ProcessPoolTaskExecutor;

/**
* 流水线模式范例
Expand All @@ -26,20 +26,20 @@ public function onInitialize()

/**
* 获取服务
* @return TaskExecutor
* @return ProcessPoolTaskExecutor
*/
public function getTaskService()
{
return create_object(
[
// 类路径
'class' => 'mix\task\TaskExecutor',
'class' => 'mix\task\ProcessPoolTaskExecutor',
// 服务名称
'name' => "mix-crontab: {$this->programName}",
// 执行类型
'type' => \mix\task\TaskExecutor::TYPE_CRONTAB,
'type' => \mix\task\ProcessPoolTaskExecutor::TYPE_CRONTAB,
// 执行模式
'mode' => \mix\task\TaskExecutor::MODE_ASSEMBLY_LINE,
'mode' => \mix\task\ProcessPoolTaskExecutor::MODE_ASSEMBLY_LINE,
// 左进程数
'leftProcess' => 1, // 定时任务类型,如果不为1,底层将自动调整为1
// 中进程数
Expand Down
10 changes: 5 additions & 5 deletions apps/crontab/commands/PushCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use mix\facades\Input;
use mix\task\CenterProcess;
use mix\task\LeftProcess;
use mix\task\TaskExecutor;
use mix\task\ProcessPoolTaskExecutor;

/**
* 推送模式范例
Expand All @@ -25,20 +25,20 @@ public function onInitialize()

/**
* 获取服务
* @return TaskExecutor
* @return ProcessPoolTaskExecutor
*/
public function getTaskService()
{
return create_object(
[
// 类路径
'class' => 'mix\task\TaskExecutor',
'class' => 'mix\task\ProcessPoolTaskExecutor',
// 服务名称
'name' => "mix-crontab: {$this->programName}",
// 执行类型
'type' => \mix\task\TaskExecutor::TYPE_CRONTAB,
'type' => \mix\task\ProcessPoolTaskExecutor::TYPE_CRONTAB,
// 执行模式
'mode' => \mix\task\TaskExecutor::MODE_PUSH,
'mode' => \mix\task\ProcessPoolTaskExecutor::MODE_PUSH,
// 左进程数
'leftProcess' => 1, // 定时任务类型,如果不为1,底层将自动调整为1
// 中进程数
Expand Down
108 changes: 36 additions & 72 deletions apps/daemon/commands/AssemblyLineCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

use mix\console\ExitCode;
use mix\facades\Input;
use mix\task\CenterProcess;
use mix\task\LeftProcess;
use mix\task\RightProcess;
use mix\task\TaskExecutor;
use mix\task\CenterWorker;
use mix\task\InputQueue;
use mix\task\LeftWorker;
use mix\task\OutputQueue;
use mix\task\RightWorker;
use mix\task\ProcessPoolTaskExecutor;

/**
* 流水线模式范例
Expand All @@ -28,28 +30,28 @@ public function onInitialize()

/**
* 获取服务
* @return TaskExecutor
* @return ProcessPoolTaskExecutor
*/
public function getTaskService()
{
return create_object(
[
// 类路径
'class' => 'mix\task\TaskExecutor',
'class' => 'mix\task\ProcessPoolTaskExecutor',
// 服务名称
'name' => "mix-daemon: {$this->programName}",
// 执行类型
'type' => \mix\task\TaskExecutor::TYPE_DAEMON,
'type' => \mix\task\ProcessPoolTaskExecutor::TYPE_DAEMON,
// 执行模式
'mode' => \mix\task\TaskExecutor::MODE_ASSEMBLY_LINE,
'mode' => \mix\task\ProcessPoolTaskExecutor::MODE_ASSEMBLY_LINE,
// 左进程数
'leftProcess' => 1,
// 中进程数
'centerProcess' => 5,
// 右进程数
'rightProcess' => 1,
// 任务超时时间 (秒)
'timeout' => 5,
// 队列名称
'queueName' => __FILE__,
]
);
}
Expand All @@ -64,83 +66,45 @@ public function actionStart()
// 启动服务
$service = $this->getTaskService();
$service->on('LeftStart', [$this, 'onLeftStart']);
$service->on('CenterStart', [$this, 'onCenterStart']);
$service->on('RightStart', [$this, 'onRightStart']);
$service->on('CenterMessage', [$this, 'onCenterMessage']);
$service->on('RightMessage', [$this, 'onRightMessage']);
$service->start();
// 返回退出码
return ExitCode::OK;
}

// 左进程启动事件回调函数
public function onLeftStart(LeftProcess $worker)
public function onLeftStart(LeftWorker $worker)
{
try {
// 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
$queueModel = new \apps\common\models\QueueModel();
// 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
for ($j = 0; $j < 16000; $j++) {
// 从消息队列中间件阻塞获取一条消息
$data = $queueModel->pop();
// 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
$worker->push($data);
}
} catch (\Exception $e) {
// 休息一会,避免 CPU 出现 100%
sleep(1);
// 抛出错误
throw $e;
// 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
$queueModel = new \apps\common\models\QueueModel();
// 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
for ($j = 1; $j < 16000; $j++) {
// 从消息队列中间件阻塞获取一条消息
//$data = $queueModel->pop();
// 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
$worker->send($j);
usleep(1000);
}
}

// 中进程启动事件回调函数
public function onCenterStart(CenterProcess $worker)
// 中进程消息事件回调函数
public function onCenterMessage(CenterWorker $worker, $data)
{
// 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
for ($j = 0; $j < 16000; $j++) {
$data = $worker->pop();
if (empty($data)) {
continue;
}
try {
// 对消息进行处理,比如:IP转换,经纬度转换等
// ...
// 将处理完成的消息推送给右进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
$worker->push($data);
} catch (\Exception $e) {
// 回退数据到消息队列
$worker->rollback($data);
// 休息一会,避免 CPU 出现 100%
sleep(1);
// 抛出错误
throw $e;
}
}
// 对消息进行处理,比如:IP转换,经纬度转换等
// ...
// 将处理完成的消息推送给右进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
$worker->send($data);
usleep(10000);
}

// 右进程启动事件回调函数
public function onRightStart(RightProcess $worker)
public function onRightMessage(RightWorker $worker, $data)
{
// 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
$tableModel = new \apps\common\models\TableModel();
// 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
for ($j = 0; $j < 16000; $j++) {
// 从进程队列中抢占一条消息
$data = $worker->pop();
if (empty($data)) {
continue;
}
try {
// 将处理完成的消息存入数据库
// ...
} catch (\Exception $e) {
// 回退数据到消息队列
$worker->rollback($data);
// 休息一会,避免 CPU 出现 100%
sleep(1);
// 抛出错误
throw $e;
}
}
// 将处理完成的消息存入数据库
// ...
var_dump($data);
usleep(1000);
}

}
14 changes: 8 additions & 6 deletions apps/daemon/commands/PushCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use mix\facades\Input;
use mix\task\CenterProcess;
use mix\task\LeftProcess;
use mix\task\TaskExecutor;
use mix\task\ProcessPoolTaskExecutor;

/**
* 推送模式范例
Expand All @@ -27,20 +27,20 @@ public function onInitialize()

/**
* 获取服务
* @return TaskExecutor
* @return ProcessPoolTaskExecutor
*/
public function getTaskService()
{
return create_object(
[
// 类路径
'class' => 'mix\task\TaskExecutor',
'class' => 'mix\task\ProcessPoolTaskExecutor',
// 服务名称
'name' => "mix-daemon: {$this->programName}",
// 执行类型
'type' => \mix\task\TaskExecutor::TYPE_DAEMON,
'type' => \mix\task\ProcessPoolTaskExecutor::TYPE_DAEMON,
// 执行模式
'mode' => \mix\task\TaskExecutor::MODE_PUSH,
'mode' => \mix\task\ProcessPoolTaskExecutor::MODE_PUSH,
// 左进程数
'leftProcess' => 1,
// 中进程数
Expand Down Expand Up @@ -78,7 +78,8 @@ public function onLeftStart(LeftProcess $worker)
// 从消息队列中间件阻塞获取一条消息
$data = $queueModel->pop();
// 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
$worker->push($data);
$worker->push($j+1);
sleep(10);
}
} catch (\Exception $e) {
// 休息一会,避免 CPU 出现 100%
Expand All @@ -102,6 +103,7 @@ public function onCenterStart(CenterProcess $worker)
try {
// 处理消息,比如:发送短信、发送邮件、微信推送
// ...
var_dump($data);
} catch (\Exception $e) {
// 回退数据到消息队列
$worker->rollback($data);
Expand Down

0 comments on commit 9d59255

Please sign in to comment.