264 lines
8.1 KiB
PHP
264 lines
8.1 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
|
|
namespace App\Amqp\Consumer;
|
|
|
|
use App\Cache\Redis\Api\ApiRedisKey;
|
|
use App\Constants\Common\OrderCode;
|
|
use App\Lib\Log;
|
|
use App\Model\Order;
|
|
use App\Model\OrderGood;
|
|
use App\Model\Sku;
|
|
use App\Service\ServiceTrait\Api\OrderTrait;
|
|
use App\Service\ServiceTrait\Common\CycleTrait;
|
|
use App\Service\ServiceTrait\Common\StockTrait;
|
|
use Exception;
|
|
use Hyperf\Amqp\Message\Type;
|
|
use Hyperf\Amqp\Result;
|
|
use Hyperf\Amqp\Annotation\Consumer;
|
|
use Hyperf\Amqp\Message\ConsumerMessage;
|
|
use Hyperf\DbConnection\Db;
|
|
use Hyperf\Di\Annotation\Inject;
|
|
use Hyperf\Redis\Redis;
|
|
use PhpAmqpLib\Message\AMQPMessage;
|
|
use Psr\Container\ContainerExceptionInterface;
|
|
use Psr\Container\NotFoundExceptionInterface;
|
|
use Throwable;
|
|
|
|
#[Consumer(exchange: 'OrderGoodStock', routingKey: 'OrderGoodStock', queue: 'OrderGoodStock.change', name: "OrderGoodStockConsumer", nums: 1)]
|
|
class OrderGoodStockConsumer extends ConsumerMessage
|
|
{
|
|
use StockTrait;
|
|
use CycleTrait;
|
|
|
|
/**
|
|
* @var Type|string 消息类型
|
|
*/
|
|
protected Type|string $type = Type::DIRECT;
|
|
|
|
/**
|
|
* @var Log $log
|
|
*/
|
|
#[Inject]
|
|
protected Log $log;
|
|
|
|
/**
|
|
* @var OrderGood
|
|
*/
|
|
#[Inject]
|
|
protected OrderGood $orderGoodModel;
|
|
|
|
/**
|
|
* @var Sku
|
|
*/
|
|
#[Inject]
|
|
protected Sku $skuModel;
|
|
|
|
/**
|
|
* @var Order
|
|
*/
|
|
#[Inject]
|
|
protected Order $orderModel;
|
|
|
|
/**
|
|
* @var Order
|
|
*/
|
|
protected Order $orderInfo;
|
|
|
|
|
|
/**
|
|
* @var array
|
|
*/
|
|
private array $orderGoodArr;
|
|
|
|
/**
|
|
* @var array
|
|
*/
|
|
private array $skuArr;
|
|
|
|
/**
|
|
* @var array
|
|
*/
|
|
private array $updateArr;
|
|
|
|
/**
|
|
* @var array
|
|
*/
|
|
private array $reduceStockCache = [];
|
|
|
|
/**
|
|
* @param $data
|
|
* @param AMQPMessage $message
|
|
* @return Result
|
|
* @throws ContainerExceptionInterface
|
|
* @throws NotFoundExceptionInterface
|
|
*/
|
|
public function consumeMessage($data, AMQPMessage $message): Result
|
|
{
|
|
if (!$data['order_id'] || !$data['type']) {
|
|
$this->log->error('OrderGoodStockConsumer:error:NoData:'.json_encode($data));
|
|
return Result::ACK;
|
|
}
|
|
|
|
$orderId = (int)$data['order_id'];
|
|
$this->orderInfo = $this->orderModel->getInfoById($orderId);
|
|
|
|
if ($this->orderInfo->cycle_id != $this->initTodayCycleId()) {
|
|
$this->log->error('OrderGoodStockConsumer:error:cycleIdError:'.json_encode([
|
|
'order_id' => $orderId,
|
|
'cycle_id' => $this->initTodayCycleId(),
|
|
'order' => $this->orderInfo->toArray(),
|
|
]));
|
|
return Result::ACK;
|
|
}
|
|
|
|
$this->orderGoodArr = [];
|
|
$this->skuArr = [];
|
|
|
|
|
|
|
|
$this->orderGoodArr = $this->orderGoodModel->getGoodIdsByOrderId($orderId);
|
|
// $this->log->debug('OrderGoodStockConsumer:'.json_encode($this->orderGoodArr));
|
|
if (empty($this->orderGoodArr)) {
|
|
$this->log->debug('OrderGoodStockConsumer:error:NoOrderGoodData:'.json_encode($orderId));
|
|
return Result::ACK;
|
|
}
|
|
|
|
$this->skuArr = $this->skuModel->getDataArrByIds(array_column($this->orderGoodArr, 'sku_id'));
|
|
if (empty($this->skuArr)) {
|
|
$this->log->debug('OrderGoodStockConsumer:error:NoSkuData:'.json_encode(array_column($this->orderGoodArr, 'sku_id')));
|
|
return Result::ACK;
|
|
}
|
|
$this->skuArr = array_column($this->skuArr, null,'id');
|
|
|
|
$this->updateArr = [];
|
|
|
|
try {
|
|
// 是否做个优化 截单后 不再增加库存
|
|
match ($data['type']) {
|
|
OrderCode::WAIT_PAY => $this->waitPaySubStock(),
|
|
OrderCode::CANCEL => $this->cancelAddStock(),
|
|
OrderCode::FINISH_REFUND => $this->RefundUpdateData(),// 用户退款后 库存回收 后台操作不回收 因为存在退款一部分 无法核查
|
|
default => throw new Exception('OrderGoodStockConsumer:error:无效的订单类型')
|
|
};
|
|
|
|
if (empty($this->updateArr)) {
|
|
$this->log->debug('OrderGoodStockConsumer:error:NoUpdateData:skuInfo:'.json_encode($this->skuArr).':orderGoodArr:'.json_encode($this->orderGoodArr));
|
|
return Result::ACK;
|
|
}
|
|
|
|
$updateModel = new Sku();
|
|
try {
|
|
Db::beginTransaction();
|
|
|
|
foreach (array_chunk($this->updateArr, 500) as $chunk) {
|
|
foreach ($chunk as $item) {
|
|
$updateModel->where('id',$item['id'])->update(array_diff_key($item, ['id' => null]));
|
|
}
|
|
}
|
|
|
|
Db::commit(); // 确认提交
|
|
} catch (Throwable $e) {
|
|
Db::rollBack(); // 出现异常时回滚
|
|
$this->log->debug('OrderGoodStockConsumer:error:UpdateSkuDataFail:msg:'.$e->getMessage().':data:'.json_encode($this->updateArr));
|
|
return Result::ACK;
|
|
}
|
|
|
|
|
|
|
|
|
|
return Result::ACK;
|
|
} catch (Exception $e) {
|
|
if (!empty($this->rollbackStockCache)) {
|
|
foreach ($this->rollbackStockCache as $goodId => $stock) {
|
|
$this->redis->zIncrBy($this->stockKey,-$stock,$goodId);
|
|
}
|
|
}
|
|
|
|
$this->log->error($e->getMessage());
|
|
return Result::ACK;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @var Redis
|
|
*/
|
|
#[Inject]
|
|
protected Redis $redis;
|
|
|
|
/**
|
|
* @var array
|
|
*/
|
|
private array $rollbackStockCache = [];
|
|
|
|
/**
|
|
* @var string
|
|
*/
|
|
private string $stockKey;
|
|
|
|
private function stockCacheAction()
|
|
{
|
|
$this->stockKey = ApiRedisKey::goodStockKey($this->orderInfo->cycle_id,$this->orderInfo->kitchen_id);
|
|
if (empty($this->reduceStockCache)) return;
|
|
|
|
$this->rollbackStockCache = [];
|
|
|
|
foreach ($this->reduceStockCache as $goodId => $stock) {
|
|
$this->rollbackStockCache[$goodId] = $stock;
|
|
if (!($this->redis->zIncrBy($this->stockKey,$stock,$goodId))) {
|
|
throw new Exception('cache error');
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @return void
|
|
*/
|
|
private function waitPaySubStock(): void
|
|
{
|
|
foreach ($this->orderGoodArr as $orderGood) {
|
|
$this->updateArr[] = [
|
|
'id' => $orderGood['sku_id'],
|
|
'sales_num' => ($this->skuArr[$orderGood['sku_id']]['sales_num'] ?? 0) + $orderGood['quantity'],
|
|
'order_num' => ($this->skuArr[$orderGood['sku_id']]['order_num'] ?? 0) + $orderGood['quantity'],
|
|
'surplus_stock' => ($this->skuArr[$orderGood['sku_id']]['surplus_stock'] ?? 0) - $orderGood['quantity'],
|
|
];
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @return void
|
|
*/
|
|
private function RefundUpdateData(): void
|
|
{
|
|
foreach ($this->orderGoodArr as $orderGood) {
|
|
$this->updateArr[] = [
|
|
'id' => $orderGood['sku_id'],
|
|
'refund_num' => ($this->skuArr[$orderGood['sku_id']]['cancel_num'] ?? 0) + $orderGood['quantity'],
|
|
'order_num' => ($this->skuArr[$orderGood['sku_id']]['order_num'] ?? 0) - $orderGood['quantity'],
|
|
'surplus_stock' => ($this->skuArr[$orderGood['sku_id']]['surplus_stock'] ?? 0) + $orderGood['quantity'],
|
|
];
|
|
|
|
$this->reduceStockCache[$orderGood['sku_id']] = $orderGood['quantity'] + ($this->reduceStockCache[$orderGood['sku_id']] ?? 0);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @return void
|
|
*/
|
|
private function cancelAddStock(): void
|
|
{
|
|
foreach ($this->orderGoodArr as $orderGood) {
|
|
$this->updateArr[] = [
|
|
'id' => $orderGood['sku_id'],
|
|
'cancel_num' => ($this->skuArr[$orderGood['sku_id']]['cancel_num'] ?? 0) + $orderGood['quantity'],
|
|
'order_num' => ($this->skuArr[$orderGood['sku_id']]['order_num'] ?? 0) - $orderGood['quantity'],
|
|
'surplus_stock' => ($this->skuArr[$orderGood['sku_id']]['surplus_stock'] ?? 0) + $orderGood['quantity'],
|
|
];
|
|
|
|
$this->reduceStockCache[$orderGood['sku_id']] = $orderGood['quantity'] + ($this->reduceStockCache[$orderGood['sku_id']] ?? 0);
|
|
}
|
|
}
|
|
}
|