server/app/Event/MQTTHandle.php

186 lines
6.5 KiB
PHP

<?php
declare(strict_types=1);
namespace App\Event;
use App\Model\Device;
use Hyperf\Context\ApplicationContext;
use Hyperf\HttpMessage\Server\Response;
use Hyperf\HttpMessage\Stream\SwooleStream;
use Hyperf\MqttServer\Annotation\MQTTConnect;
use Hyperf\MqttServer\Annotation\MQTTDisconnect;
use Hyperf\MqttServer\Annotation\MQTTPingReq;
use Hyperf\MqttServer\Annotation\MQTTPublish;
use Hyperf\MqttServer\Annotation\MQTTSubscribe;
use Hyperf\MqttServer\Annotation\MQTTUnsubscribe;
use Hyperf\MqttServer\Handler\HandlerInterface;
use Hyperf\MqttServer\Handler\ResponseRewritable;
use Hyperf\Redis\Redis;
use Psr\Http\Message\ServerRequestInterface;
use Simps\MQTT\Protocol\Types;
use Simps\MQTT\Protocol\V3;
use Swoole\Server;
/**
* MQTT服务
* Author: cfn <cfn@leapy.cn>.
*/
class MQTTHandle implements HandlerInterface
{
use ResponseRewritable;
#[MQTTConnect()]
public function handle(ServerRequestInterface $request, Response $response): Response
{
$data = $request->getParsedBody();
if ($data['protocol_name'] != 'MQTT') {
return $response->withAttribute('closed', true);
}
if (! $this->isRewritable($response)) {
return $response;
}
// 判断是否是发布客户端
if ($data['client_id'] == 'publisher') {
return $response;
}
var_dump($data);
// 判断设备是否存在
$has = Device::snHas($data['client_id']);
if (! $has) {
return $response->withAttribute('closed', true);
}
$container = ApplicationContext::getContainer();
$redis = $container->get(Redis::class);
$fd = $request->getAttribute('fd');
// 记录设备活动时间
$redis->set('MQTT_ACTIVE_TIME:' . $fd, time());
$redis->set('MQTT_CLIENT_FD:' . $data['client_id'], $fd);
return $response;
}
#[MQTTPingReq()]
public function pingReqHandle(ServerRequestInterface $request, Response $response): Response
{
if (! $this->isRewritable($response)) {
return $response;
}
$container = ApplicationContext::getContainer();
$redis = $container->get(Redis::class);
$fd = $request->getAttribute('fd');
// 记录设备活动时间
$redis->set('MQTT_ACTIVE_TIME:' . $fd, time());
$redis->set('MQTT_CLIENT_FD:' . $fd, time());
return $response->withBody(new SwooleStream(V3::pack(
['type' => Types::PINGRESP]
)));
}
#[MQTTPublish()]
public function publishHandle(ServerRequestInterface $request, Response $response): Response
{
$container = ApplicationContext::getContainer();
$redis = $container->get(Redis::class);
/** @var Server $server */
$server = $response->getAttribute('server');
$fd = $request->getAttribute('fd');
$data = $request->getParsedBody();
$arr = json_decode($redis->get('MQTT_TOPIC_FDS:' . $data['topic'])) ?? [];
// 所有连接的设备主题推送
foreach ($server->connections as $targetFd) {
if ($targetFd != $fd && in_array($targetFd, $arr)) {
$server->send(
$targetFd,
V3::pack(
[
'type' => $data['type'],
'topic' => $data['topic'],
'message' => $data['message'],
'dup' => $data['dup'],
'qos' => $data['qos'],
'retain' => $data['retain'],
'message_id' => $data['message_id'] ?? '',
]
)
);
}
}
if ($data['qos'] === 1) {
$response = $response->withBody(new SwooleStream(V3::pack(
[
'type' => Types::PUBACK,
'message_id' => $data['message_id'] ?? '',
]
)));
}
return $response;
}
#[MQTTSubscribe()]
public function subscribeHandle(ServerRequestInterface $request, Response $response): Response
{
$container = ApplicationContext::getContainer();
$redis = $container->get(Redis::class);
$fd = $request->getAttribute('fd');
$data = $request->getParsedBody();
$payload = [];
foreach ($data['topics'] as $k => $qos) {
if (is_numeric($qos) && $qos < 3) {
$arr = json_decode($redis->get('MQTT_TOPIC_FDS:' . $k)) ?? [];
if (! in_array($fd, $arr)) {
$arr[] = $fd;
$redis->set('MQTT_TOPIC_FDS:' . $k, json_encode($arr));
}
$payload[] = $qos;
} else {
$payload[] = 0x80;
}
}
$redis->set('MQTT_CLIENT_FD:' . $fd, time());
return $response->withBody(new SwooleStream(V3::pack(
[
'type' => Types::SUBACK,
'message_id' => $data['message_id'] ?? '',
'codes' => $payload,
'topics' => $data['topics'],
]
)));
}
#[MQTTUnsubscribe()]
public function unsubscribeHandle(ServerRequestInterface $request, Response $response): Response
{
$container = ApplicationContext::getContainer();
$redis = $container->get(Redis::class);
$fd = $request->getAttribute('fd');
$data = $request->getParsedBody();
foreach ($data['topics'] as $k => $qos) {
$arr = json_decode($redis->get('MQTT_TOPIC_FDS:' . $k)) ?? [];
if (in_array($fd, $arr)) {
$arr = array_filter($arr, function ($val) use ($fd) {
return $val != $fd;
});
$redis->set('MQTT_TOPIC_FDS:' . $k, json_encode($arr));
}
}
return $response->withBody(new SwooleStream(V3::pack(
[
'type' => Types::UNSUBACK,
'message_id' => $data['message_id'] ?? '',
'topics' => $data['topics'],
]
)));
}
#[MQTTDisconnect()]
public function disconnectHandle(ServerRequestInterface $request, Response $response): Response
{
$container = ApplicationContext::getContainer();
$redis = $container->get(Redis::class);
$fd = $request->getAttribute('fd');
$redis->del('MQTT_CLIENT_FD:' . $fd);
return $response->withAttribute('closed', true);
}
}