swoole异步task邮件案例

有人住高楼,有人在深沟,有人光万丈,有人一身锈。世人万千种,浮云莫去求,斯人若彩虹,遇上方知有。

我们在写代码的过程中应该注意的问题:

  • 开启数量适中的Worker进程和Task进程
  • 守护进程化
  • 配置运行时日志
  • 平滑重启
  • 避免内存泄漏
  • 避免粘包问题

除此之外,跟swoole打交道,我们还应该注意下面这些问题:

  • 为了避免Worker阻塞,避免使用sleep等睡眠函数
  • 不要使用die或者exit函数,即使在你调试的时候
  • 保持良好的代码风格,try/catch捕获异常
  • 如果Worker进程无法预料会发生异常退出,虽然Manager进程会重新拉起新的Worker进程,但是我们可以通过register_shutdown_function方法在进程退出前“善后”

邮件案例

首先发送邮件,我们借助第三方类库 swiftmailer。有些框架可能集成了swiftmailer,比如yii2,本来准备在yii2的基础之上来讲,考虑部分人可能对这个框架不熟悉,我们这里直接根据swiftmailer代码操作,框架中一样可以使用,无任何影响。

我们执行下面的命令,把swiftmailer下载到本地,下载好之后swiftmailer会被下载到一个叫vendor文件夹的目录里面

1
composer require "swiftmailer/swiftmailer:^6.0"

然后我们封装一个简单的邮件类Mailer.php,同vendor目录同级,用于发送邮件,该类后期可自行完善,比如增加批量发送邮件或者增加发送模版邮件等操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<?php
require_once __DIR__ . '/vendor/autoload.php';

class Mailer
{
public $transport;
public $mailer;
/**
* 发送邮件类 参数 $data 需要三个必填项 包括 邮件主题`$data['subject']`、接收邮件的人`$data['to']`和邮件内容 `$data['content']`
* @param Array $data
* @return bool $result 发送成功 or 失败
*/
public function send($data)
{
$this->transport = (new Swift_SmtpTransport('smtp.qq.com', 25))
->setEncryption('tls')
->setUsername('bailangzhan@qq.com')
->setPassword('xxxxxx');
$this->mailer = new Swift_Mailer($this->transport);

$message = (new Swift_Message($data['subject']))
->setFrom(array('bailangzhan@qq.com' => '白狼栈'))
->setTo(array($data['to']))
->setBody($data['content']);

$result = $this->mailer->send($message);

// 释放
$this->destroy();
return $result;
}
public function destroy()
{
$this->transport = null;
$this->mailer = null;
}
}

在这段代码中,你需要修改的地方包括 Host、Post、Encryption、Username、Password和From。

Mailer类简单的封装好之后,我们写几行代码测试下你的邮件类是否可以正确的使用

1
2
3
4
5
6
7
8
require_once __DIR__ . "/task/Mailer.php";
$data = [
'to' => '422744***@qq.com',
'subject' => 'just a test',
'content' => 'This is just a test.',
];
$mailer = new Mailer;
$mailer->send($data);

to是要发送给谁,subject邮件标题,content邮件内容。

如果不可以正常发送,请检查swiftmailer相关类正确引入并且保证Mailer类的配置可用。

邮件类准备好之后,我们正式开始写swoole server,主要代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
<?php
class TaskServer
{
private $_serv;
private $_run;
/**
* init
*/
public function __construct()
{
$this->_serv = new Swoole\Server("127.0.0.1", 9501);
$this->_serv->set([
'worker_num' => 2,
'daemonize' => false,
'log_file' => __DIR__ . '/server.log',
'task_worker_num' => 2,
'max_request' => 5000,
'task_max_request' => 5000,
'open_eof_check' => true, //打开EOF检测
'package_eof' => "\r\n", //设置EOF
'open_eof_split' => true, // 自动分包
]);
$this->_serv->on('Connect', [$this, 'onConnect']);
$this->_serv->on('Receive', [$this, 'onReceive']);
$this->_serv->on('WorkerStart', [$this, 'onWorkerStart']);
$this->_serv->on('Task', [$this, 'onTask']);
$this->_serv->on('Finish', [$this, 'onFinish']);
$this->_serv->on('Close', [$this, 'onClose']);
}
public function onConnect($serv, $fd, $fromId)
{
}
public function onWorkerStart($serv, $workerId)
{
require_once __DIR__ . "/TaskRun.php";
$this->_run = new TaskRun;
}
public function onReceive($serv, $fd, $fromId, $data)
{
$data = $this->unpack($data);
$this->_run->receive($serv, $fd, $fromId, $data);
// 投递一个任务到task进程中
if (!empty($data['event'])) {
$serv->task(array_merge($data , ['fd' => $fd]));
}
}
public function onTask($serv, $taskId, $fromId, $data)
{
$this->_run->task($serv, $taskId, $fromId, $data);
}
public function onFinish($serv, $taskId, $data)
{
$this->_run->finish($serv, $taskId, $data);
}
public function onClose($serv, $fd, $fromId)
{
}
/**
* 对数据包单独处理,数据包经过`json_decode`处理之后,只能是数组
* @param $data
* @return bool|mixed
*/
public function unpack($data)
{
$data = str_replace("\r\n", '', $data);
if (!$data) {
return false;
}
$data = json_decode($data, true);
if (!$data || !is_array($data)) {
return false;
}
return $data;
}
public function start()
{
$this->_serv->start();
}
}
$reload = new TaskServer;
$reload->start();

简单分析下:

  • 在onWorkerStart回调内,我们引入了实际处理业务逻辑的类TaskRun.php,为什么这么说呢?因为我们在onReceive\onTask\onFinish回调内均把数据交给了TaskRun对象去处理了
  • 我们约定,每个数据包都必须带有EOF标记\r\n,在server端为了更好的处理数据,onReceive回调内我们把数据包丢给了unpack方法处理,该方法的目的就是把数据包的EOF标记去掉,还原真实的数据包。我们还约定,server收到的数据包经过unpack处理之后只能是数组,非数组在unpack中就被直接处理掉了。
  • onReceive回调内,我们看到,只有数据包含有event项才会被投递给Task进程,这样做的原因是Task进程可能要处理各种任务,增加event项是为了表明投递过来的任务是要做什么的。

我们看TaskRun的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<?php
require_once ('./TaskClient.php');
require_once ('./Mailer.php');
class TaskRun
{
public function receive($serv, $fd, $fromId, $data)
{
}
public function task($serv, $taskId, $fromId, $data)
{
try {
switch ($data['event']) {
case TaskClient::EVENT_TYPE_SEND_MAIL:
$mailer = new Mailer;
$result = $mailer->send($data);
break;
default:
break;
}
return $result;
} catch (\Exception $e) {
throw new \Exception('task exception :' . $e->getMessage());
}
}
public function finish($serv, $taskId, $data)
{
return true;
}
}

目前,我们主要就一个业务,“发送邮件”,所以TaskRun类的实现现在看来非常简单。

因为发邮件是一件比较耗时的任务,所以我们这里完善的是task回调。我们根据投递给Task进程的数据类型,判断投递过来的数据是要做什么。比如我们这里有一项event,等于TaskClient::EVENT_TYPE_SEND_MAIL,这一项就是发送邮件的标识,如果要投递的任务的event项等于TaskClient::EVENT_TYPE_SEND_MAIL,就表明这个任务是邮件任务,程序上就可以通过switch去处理邮件了。

TaskClient是什么呢?这是一个封装好的客户端处理类,我们来看下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<?php
class TaskClient
{
private $client;
const EVENT_TYPE_SEND_MAIL = 'send-mail';
public function __construct ()
{
$this->client = new Swoole\Client(SWOOLE_SOCK_TCP);
if (!$this->client->connect('127.0.0.1', 9501)) {
$msg = 'swoole client connect failed.';
throw new \Exception("Error: {$msg}.");
}
}
/**
* @param $data Array
* send data
*/
public function sendData ($data)
{
$data = $this->togetherDataByEof($data);
$this->client->send($data);
}
/**
* 数据末尾拼接EOF标记
* @param Array $data 要处理的数据
* @return String json_encode($data) . EOF
*/
public function togetherDataByEof($data)
{
if (!is_array($data)) {
return false;
}
return json_encode($data) . "\r\n";
}
}