swoole结合php实现多任务异步处理

走出来的是因为从来没进去过, 进去了的, 我从没见过走出来的
大部分时候我们项目的业务逻辑是同步阻塞运行的,但是有些时候会遇到一些耗时较大的操作,比如向十万个用户群发通知邮件,我们的程序不可能等待十万次循环后再执行其他操作,这种时候我们会采用异步操作,由 worker 进程向 task 进程发送任务,task 进程处理完全部任务之后通过onFinish回调函数通知 worker 进程。 ### 创建命令行类 创建application/console/AsyncTask.php文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
<?php
namespace App\Console;

use think\console\Command;
use think\console\Input;
use think\console\Output;


class AsyncTask extends Command
{
protected $server;

// 设置命令行名称,以及命令行描述
protected function configure()
{
$this->setName('task:start')->setDescription('Start TCP(Timer) Server!');
}

protected function execute(Input $input, Output $output)
{
$this->server = new \swoole_server("0.0.0.0", 9501);
$this->server->set([
'worker_num' => 4,
'daemonize' => false,
'task_worker_num' => 4 # task 进程数
]);
$this->server->on('Start', [$this, 'onStart']);
$this->server->on('Connect', [$this, 'onConnect']);
$this->server->on('Receive', [$this, 'onReceive']);
$this->server->on('Task', [$this, 'onTask']);
$this->server->on('Finish', [$this, 'onFinish']);
$this->server->on('Close', [$this, 'onClose']);
$this->server->start();
}
// 主进程启动时回调函数
public function onStart(\swoole_server $server)
{
echo "开始\n";
}
// 建立连接时回调函数
public function onConnect(\swoole_server $server, $fd, $from_id)
{
echo "连接上了\n";
}

public function onReceive(\swoole_server $server, $fd, $from_id, $data)
{
//echo "message: {$data} form Client: {$fd} \n";
// 投递异步任务
$task_id = $server->task($data);
// echo "Dispath AsyncTask: id={$task_id}\n";
// 将受到的客户端消息再返回给客户端
$server->send($fd, "Message form Server: {$data}, task_id: {$task_id}");
}


// 异步任务处理函数
public function onTask(\swoole_server $server, $task_id, $from_id, $data)
{
echo "{$task_id}, 任务处理 \n";
$server->finish("$data -> OK");
}

public function onFinish(\swoole_server $server, $task_id, $data)
{
echo "结束";
}

// 关闭连时回调函数
public function onClose(\swoole_server $server, $fd, $from_id)
{
echo "结束\n";
}

}

修改配置文件

文件所在 application/command.php

1
2
3
4
5
<?php

return [
'app\console\AsyncTask',
];

启动任务

1
php think task:start

使用 telnet 进行测试

建议使用 xshell

1
telnet 60.205.200.40 9501

连接上服务器后,输入hello回车,发送消息给 TCP 服务器,将受到一下回执

http://www.iswoole.com/article/2066