完整版V1 加入爬虫功能
This commit is contained in:
@@ -5,38 +5,233 @@ declare(strict_types=1);
|
||||
namespace app\common\service;
|
||||
|
||||
use app\admin\model\split\Ticket;
|
||||
use app\common\library\scrm\UnifiedScrmData;
|
||||
use think\Db;
|
||||
use think\Exception;
|
||||
|
||||
/**
|
||||
* 分流工单数据同步服务(骨架,后续按 ticket_type 对接各云控 API)
|
||||
*
|
||||
* 期望第三方接口 payload 字段映射:
|
||||
* - complete_count int 完成数量
|
||||
* - inbound_count int 进线人数
|
||||
* - speed_per_hour float 每小时进线人数
|
||||
* - number_count int 号码总数(含离线+封号)
|
||||
* - number_offline_count int 可选 离线数
|
||||
* - number_banned_count int 可选 封号数
|
||||
* - online_count int 在线人数
|
||||
* 分流工单云控数据同步服务
|
||||
*/
|
||||
class SplitTicketSyncService
|
||||
{
|
||||
/**
|
||||
* 同步单条工单(后续实现:按 ticket_type 选择适配器并请求 API)
|
||||
*/
|
||||
public function syncOne(int $ticketId): bool
|
||||
private SplitTicketNumberSyncService $numberSync;
|
||||
|
||||
private SplitTicketRuleService $ruleService;
|
||||
|
||||
private SplitTicketSyncLockService $lockService;
|
||||
|
||||
public function __construct()
|
||||
{
|
||||
$ticket = Ticket::get($ticketId);
|
||||
if (!$ticket) {
|
||||
return false;
|
||||
}
|
||||
// TODO: 调用具体云控适配器获取 $payload
|
||||
return false;
|
||||
$this->numberSync = new SplitTicketNumberSyncService();
|
||||
$this->ruleService = new SplitTicketRuleService();
|
||||
$this->lockService = new SplitTicketSyncLockService();
|
||||
}
|
||||
|
||||
/**
|
||||
* 将同步结果写入工单表
|
||||
* 同步单条工单
|
||||
*
|
||||
* @return array{success:bool,message:string,skipped?:bool}
|
||||
*/
|
||||
public function syncOne(int $ticketId, bool $force = false): array
|
||||
{
|
||||
$ticket = Ticket::get($ticketId);
|
||||
if (!$ticket) {
|
||||
SplitTicketSyncLogger::log('sync', 'ticket not found', ['ticketId' => $ticketId]);
|
||||
return ['success' => false, 'message' => '工单不存在'];
|
||||
}
|
||||
|
||||
SplitTicketSyncLogger::setTicketContext($ticketId, (string) $ticket['ticket_type']);
|
||||
SplitTicketSyncLogger::log('sync', 'syncOne start', [
|
||||
'force' => $force,
|
||||
'status' => (string) $ticket['status'],
|
||||
'syncFailCount' => (int) ($ticket['sync_fail_count'] ?? 0),
|
||||
'syncTime' => (int) ($ticket['sync_time'] ?? 0),
|
||||
'pageUrl' => (string) $ticket['ticket_url'],
|
||||
'nodeHost' => SplitSyncConfigService::getNodeHost(),
|
||||
]);
|
||||
|
||||
if (!$force) {
|
||||
$skip = $this->shouldSkip($ticket);
|
||||
if ($skip !== null) {
|
||||
SplitTicketSyncLogger::log('sync', 'skipped', ['reason' => $skip]);
|
||||
SplitTicketSyncLogger::clearTicketContext();
|
||||
return ['success' => false, 'message' => $skip, 'skipped' => true];
|
||||
}
|
||||
}
|
||||
|
||||
if (!$this->lockService->acquire($ticketId)) {
|
||||
SplitTicketSyncLogger::log('sync', 'lock busy', ['ticketId' => $ticketId]);
|
||||
SplitTicketSyncLogger::clearTicketContext();
|
||||
return ['success' => false, 'message' => '工单正在同步中', 'skipped' => true];
|
||||
}
|
||||
|
||||
try {
|
||||
$result = $this->doSync($ticket);
|
||||
SplitTicketSyncLogger::log('sync', 'syncOne end', $result);
|
||||
return $result;
|
||||
} finally {
|
||||
$this->lockService->release($ticketId);
|
||||
SplitTicketSyncLogger::clearTicketContext();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 扫描到期工单并同步
|
||||
*/
|
||||
public function syncDueTickets(): int
|
||||
{
|
||||
$count = 0;
|
||||
$failThreshold = SplitSyncConfigService::getFailPauseThreshold();
|
||||
$query = Ticket::where('status', 'normal');
|
||||
if ($failThreshold > 0) {
|
||||
$query->where('sync_fail_count', '<', $failThreshold);
|
||||
}
|
||||
$list = $query->select();
|
||||
|
||||
SplitTicketSyncLogger::log('cron', 'scan start', [
|
||||
'candidateCount' => count($list),
|
||||
]);
|
||||
|
||||
foreach ($list as $ticket) {
|
||||
$skip = $this->shouldSkip($ticket);
|
||||
if ($skip !== null) {
|
||||
SplitTicketSyncLogger::log('cron', 'candidate skipped', [
|
||||
'ticketId' => (int) $ticket['id'],
|
||||
'ticketType' => (string) $ticket['ticket_type'],
|
||||
'reason' => $skip,
|
||||
]);
|
||||
continue;
|
||||
}
|
||||
$result = $this->syncOne((int) $ticket['id'], false);
|
||||
if (!empty($result['skipped'])) {
|
||||
continue;
|
||||
}
|
||||
$count++;
|
||||
}
|
||||
|
||||
SplitTicketSyncLogger::log('cron', 'scan end', ['processedCount' => $count]);
|
||||
return $count;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array{success:bool,message:string}
|
||||
*/
|
||||
private function doSync(Ticket $ticket): array
|
||||
{
|
||||
$ticketType = (string) $ticket['ticket_type'];
|
||||
$pageUrl = trim((string) $ticket['ticket_url']);
|
||||
if ($pageUrl === '') {
|
||||
SplitTicketSyncLogger::log('sync', 'empty pageUrl');
|
||||
$this->markFailure($ticket, '工单链接为空');
|
||||
return ['success' => false, 'message' => '工单链接为空'];
|
||||
}
|
||||
|
||||
if (!SplitScrmSpiderFactory::isSupported($ticketType)) {
|
||||
SplitTicketSyncLogger::log('sync', 'spider not supported', ['ticketType' => $ticketType]);
|
||||
$this->markFailure($ticket, '工单类型尚未实现蜘蛛');
|
||||
return ['success' => false, 'message' => '工单类型尚未实现蜘蛛'];
|
||||
}
|
||||
|
||||
SplitTicketSyncLogger::log('sync', 'create spider', [
|
||||
'ticketType' => $ticketType,
|
||||
'hasAccount' => trim((string) ($ticket['account'] ?? '')) !== '',
|
||||
]);
|
||||
$spider = SplitScrmSpiderFactory::create(
|
||||
$ticketType,
|
||||
$pageUrl,
|
||||
(string) ($ticket['account'] ?? ''),
|
||||
(string) ($ticket['password'] ?? '')
|
||||
);
|
||||
if ($spider === null) {
|
||||
$this->markFailure($ticket, '无法创建蜘蛛实例');
|
||||
return ['success' => false, 'message' => '无法创建蜘蛛实例'];
|
||||
}
|
||||
|
||||
Db::startTrans();
|
||||
try {
|
||||
SplitTicketSyncLogger::log('sync', 'spider run begin');
|
||||
$finalData = $spider->run();
|
||||
if (!$finalData instanceof UnifiedScrmData) {
|
||||
throw new Exception('蜘蛛返回数据无效');
|
||||
}
|
||||
|
||||
$this->numberSync->syncFromUnifiedData($ticket, $finalData);
|
||||
|
||||
$completeCount = max(0, $finalData->todayNewCount);
|
||||
$this->ruleService->applyTicketStatusRules($ticket, $completeCount);
|
||||
|
||||
$freshTicket = Ticket::get((int) $ticket['id']) ?: $ticket;
|
||||
if ((string) $freshTicket['status'] === 'hidden') {
|
||||
$this->ruleService->cascadeTicketClosedToNumbers($freshTicket);
|
||||
}
|
||||
// 号码开关最后统一由 applyNumberRules 判定(单号上限/下号比率/云控在线)
|
||||
$this->ruleService->applyNumberRules($freshTicket);
|
||||
$ticket = $freshTicket;
|
||||
|
||||
$inboundCount = $this->numberSync->sumInboundForTicket($ticket);
|
||||
$speed = $this->calcSpeedPerHour($ticket, $completeCount);
|
||||
|
||||
$payload = [
|
||||
'complete_count' => $completeCount,
|
||||
'inbound_count' => $inboundCount,
|
||||
'speed_per_hour' => $speed['speed'],
|
||||
'number_count' => max(0, $finalData->total),
|
||||
'number_offline_count' => max(0, $finalData->totalOffline),
|
||||
'number_banned_count' => 0,
|
||||
'online_count' => max(0, $finalData->totalOnline),
|
||||
'sync_fail_count' => 0,
|
||||
'speed_snapshot_count' => $speed['snapshot_count'],
|
||||
'speed_snapshot_time' => $speed['snapshot_time'],
|
||||
];
|
||||
|
||||
$this->applySyncResult($ticket, $payload, true, '');
|
||||
Db::commit();
|
||||
SplitTicketSyncLogger::log('sync', 'db commit ok', $payload);
|
||||
return ['success' => true, 'message' => '同步成功'];
|
||||
} catch (\Throwable $e) {
|
||||
Db::rollback();
|
||||
$msg = mb_substr($e->getMessage(), 0, 255, 'UTF-8');
|
||||
SplitTicketSyncLogger::log('sync', 'exception', [
|
||||
'type' => get_class($e),
|
||||
'message' => $msg,
|
||||
'file' => $e->getFile(),
|
||||
'line' => $e->getLine(),
|
||||
]);
|
||||
$this->markFailure($ticket, $msg);
|
||||
return ['success' => false, 'message' => $msg];
|
||||
}
|
||||
}
|
||||
|
||||
private function shouldSkip(Ticket $ticket): ?string
|
||||
{
|
||||
if ((string) $ticket['status'] === 'hidden') {
|
||||
return '工单已关闭';
|
||||
}
|
||||
$failThreshold = SplitSyncConfigService::getFailPauseThreshold();
|
||||
if ($failThreshold > 0 && (int) ($ticket['sync_fail_count'] ?? 0) >= $failThreshold) {
|
||||
return sprintf('连续同步失败超过%d次已暂停', $failThreshold);
|
||||
}
|
||||
if (!SplitScrmSpiderFactory::isSupported((string) $ticket['ticket_type'])) {
|
||||
return '工单类型尚未实现';
|
||||
}
|
||||
$interval = SplitSyncConfigService::getIntervalMinutes((string) $ticket['ticket_type']);
|
||||
if ($interval <= 0) {
|
||||
return '该类型未配置自动同步周期';
|
||||
}
|
||||
$lastSync = (int) ($ticket['sync_time'] ?? 0);
|
||||
$elapsed = $lastSync > 0 ? (time() - $lastSync) : null;
|
||||
if ($lastSync > 0 && $elapsed !== null && $elapsed < ($interval * 60)) {
|
||||
SplitTicketSyncLogger::log('sync', 'interval not reached', [
|
||||
'intervalMinutes' => $interval,
|
||||
'elapsedSeconds' => $elapsed,
|
||||
'needSeconds' => $interval * 60,
|
||||
]);
|
||||
return '未到同步周期';
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<string, mixed> $payload
|
||||
*/
|
||||
public function applySyncResult(Ticket $ticket, array $payload, bool $success, string $message = ''): void
|
||||
@@ -52,9 +247,74 @@ class SplitTicketSyncService
|
||||
'sync_status' => $success ? 'success' : 'error',
|
||||
'sync_time' => time(),
|
||||
'sync_message' => $success ? '' : mb_substr($message, 0, 255, 'UTF-8'),
|
||||
'sync_fail_count' => $success ? 0 : ((int) ($ticket['sync_fail_count'] ?? 0) + 1),
|
||||
'speed_snapshot_count' => (int) ($payload['speed_snapshot_count'] ?? $ticket['speed_snapshot_count'] ?? 0),
|
||||
'speed_snapshot_time' => (int) ($payload['speed_snapshot_time'] ?? $ticket['speed_snapshot_time'] ?? 0),
|
||||
];
|
||||
if (!$ticket->allowField(array_keys($data))->save($data)) {
|
||||
throw new Exception('工单同步结果保存失败');
|
||||
}
|
||||
}
|
||||
|
||||
private function markFailure(Ticket $ticket, string $message): void
|
||||
{
|
||||
$failCount = (int) ($ticket['sync_fail_count'] ?? 0) + 1;
|
||||
$failThreshold = SplitSyncConfigService::getFailPauseThreshold();
|
||||
$previousSyncStatus = (string) ($ticket['sync_status'] ?? 'pending');
|
||||
$neverSyncedSuccessfully = $previousSyncStatus === 'pending' && (int) ($ticket['sync_time'] ?? 0) <= 0;
|
||||
$update = [
|
||||
'sync_status' => 'error',
|
||||
'sync_time' => time(),
|
||||
'sync_message' => mb_substr($message, 0, 255, 'UTF-8'),
|
||||
'sync_fail_count' => $failCount,
|
||||
];
|
||||
// 新建工单首次同步失败:立即关闭;已同步过的工单仍按连续失败阈值关闭
|
||||
if ($neverSyncedSuccessfully || ($failThreshold > 0 && $failCount >= $failThreshold)) {
|
||||
$update['status'] = 'hidden';
|
||||
}
|
||||
$ticket->save($update);
|
||||
if (isset($update['status']) && $update['status'] === 'hidden') {
|
||||
$fresh = Ticket::get((int) $ticket['id']);
|
||||
if ($fresh) {
|
||||
$this->ruleService->cascadeTicketClosedToNumbers($fresh);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array{speed:float,snapshot_count:int,snapshot_time:int}
|
||||
*/
|
||||
private function calcSpeedPerHour(Ticket $ticket, int $currentComplete): array
|
||||
{
|
||||
$now = time();
|
||||
$snapshotTime = (int) ($ticket['speed_snapshot_time'] ?? 0);
|
||||
$snapshotCount = (int) ($ticket['speed_snapshot_count'] ?? 0);
|
||||
|
||||
if ($snapshotTime <= 0) {
|
||||
return [
|
||||
'speed' => 0.0,
|
||||
'snapshot_count' => $currentComplete,
|
||||
'snapshot_time' => $now,
|
||||
];
|
||||
}
|
||||
|
||||
$elapsed = $now - $snapshotTime;
|
||||
if ($elapsed >= 3600) {
|
||||
return [
|
||||
'speed' => 0.0,
|
||||
'snapshot_count' => $currentComplete,
|
||||
'snapshot_time' => $now,
|
||||
];
|
||||
}
|
||||
|
||||
$hours = $elapsed > 0 ? ($elapsed / 3600) : 0;
|
||||
$delta = $currentComplete - $snapshotCount;
|
||||
$speed = ($delta < 0 || $hours <= 0) ? 0.0 : round($delta / $hours, 2);
|
||||
|
||||
return [
|
||||
'speed' => $speed,
|
||||
'snapshot_count' => $snapshotCount,
|
||||
'snapshot_time' => $snapshotTime,
|
||||
];
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user