Files
hyperf_service/app/Amqp/Consumer/WxSubMessageConsumer.php
2025-04-01 16:27:10 +08:00

76 lines
2.2 KiB
PHP

<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;
use App\Constants\Common\ThirdCode;
use App\Constants\Common\WxMiniCode;
use App\Exception\ErrException;
use App\Lib\Log;
use App\Model\UserThird;
use App\Service\ServiceTrait\Api\WxMiniTrait;
use Exception;
use Hyperf\Amqp\Message\Type;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Di\Annotation\Inject;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
#[Consumer(exchange: 'wxSubMessage', routingKey: 'wxSubMessage', queue: 'wxSubMessage.send', name: "WxSubMessageConsumer", nums: 1)]
class WxSubMessageConsumer extends ConsumerMessage
{
use WxMiniTrait;
/**
* @var Type|string 消息类型
*/
protected Type|string $type = Type::DIRECT;
/**
* @var Log
*/
#[Inject]
protected Log $log;
/**
* @var UserThird
*/
#[Inject]
protected UserThird $userThirdModel;
/**
* @param $data
* @param AMQPMessage $message
* @return Result
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function consumeMessage($data, AMQPMessage $message): Result
{
if (!$data['type'] || !$data['user_id'] || !$data['data']) {
$this->log->error('WxSubMessageConsumer:error:NoData:'.json_encode($data));
return Result::ACK;
}
$this->log->debug('WxSubMessageConsumer:'.json_encode($data));
$openId = $this->userThirdModel->where('user_id',$data['user_id'])->where('type',ThirdCode::WX_LOGIN)->value('open_id') ?? null;
if (empty($openId)) {
$this->log->error('WxSubMessageConsumer:error:NoOpenId:'.json_encode($data));
return Result::ACK;
}
try {
$this->sendSubMessage(WxMiniCode::TEMPLATE_ID_LIST[$data['type']],$openId,$data['data'],$data['page'] ?? null);
} catch (Exception|ErrException $e) {
$this->log->error('WxSubMessageConsumer:error:'.$e->getMessage().':data:'.json_encode($data));
}
return Result::ACK;
}
}