server/app/Job/NotifyJob.php

64 lines
1.9 KiB
PHP

<?php
namespace App\Job;
use App\Utils\QueueClient;
use Hyperf\AsyncQueue\Job;
use App\Model\Message as mModel;
use App\Model\Account as aModel;
use App\Model\AccountMessage as amModel;
use App\Model\MessageChannel as mcModel;
class NotifyJob extends Job
{
// 重试次数
protected int $maxAttempts = 1;
protected int|null $messageId;
protected array|null $userIds;
protected array|null $channels;
public function __construct(array $params)
{
$this->messageId = $params["messageId"] ?? null;
$this->userIds = $params["userIds"] ?? null;
$this->channels = $params["channels"] ?? null;
}
public function handle()
{
// 查询消息信息
$message = mModel::getById($this->messageId);
if (empty($message)) return true;
if (empty($this->userIds) && $message['type'] == "system") {
// 系统消息推送给所有用户
$this->userIds = aModel::pluck("account_id")->toArray();
}
if (empty($this->userIds)) return true;
// 批量插入数据
$mess = [];
foreach ($this->userIds as $id) {
$mess[] = [
'account_id' => $id,
'message_id' => $this->messageId,
'create_time' => date("Y-m-d H:i:s")
];
}
amModel::insert($mess);
// 多渠道推送
if (empty($this->channels)) return true;
foreach ($this->userIds as $id) {
foreach ($this->channels as $channel) {
mcModel::add([
'account_id' => $id,
'message_id' => $this->messageId,
'channel' => $channel
]);
// 队列多渠道推送
QueueClient::push("App\Job\SendChannelJob", ["userId" => $id, "messageId" => $this->messageId, "channel" => $channel]);
}
}
return true;
}
}