infoMessage(date('Y-m-d H:i:s') . ' 开始执行~'); $param = $this->argument('param'); if(empty($param) || !in_array($param, [1,2])) { $this->infoMessage('执行参数错误'); } switch($param) { case 1: // 生产 $this->produce(); break; case 2: // 消费 $this->consume(); break; } $this->infoMessage(date('Y-m-d H:i:s') . ' 结束执行~'); return ; } public function produce() { $accountConf = AccountConf::query() ->where('enable', 1) ->get(); if(empty($accountConf)) { $this->infoMessage(' 暂无小说配置数据'); die; } $novelDataList = NovelConf::query() ->where('enable', 1) // ->where('id', 108) ->get(); if(empty($novelDataList)) { $this->infoMessage(' 暂无启用的小说数据'); die; } $novelDataList = $novelDataList->keyBy('id')->toArray(); $accountConf = $accountConf->toArray(); $novelData = []; foreach ($accountConf as $conf) { $value = []; $value['start_date'] = $conf['start_date']; $value['end_date'] = $conf['end_date']; $novelInfo = isset($novelDataList[$conf['novel_id']]) ? $novelDataList[$conf['novel_id']] : null; if(is_null($novelInfo)) { continue; } $i = $value['start_date']; while($i<=$value['end_date']) { $key = $conf['novel_id'].'###'.$i; if(isset($novelData[$key])) { $novelData[$key]['app_list'][] = $conf['app_id']; } else { $novelData[$key]['novel_id'] = $conf['novel_id']; $novelData[$key]['subscribed_date'] = $i; $novelData[$key]['app_list'][] = $conf['app_id']; } $i = date('Y-m-d', strtotime($i . ' +1 day')); } } if(empty($novelData)) { $this->infoMessage(' 小说数据整理数据结果为空'); die; } foreach($novelData as $novel) { $value = json_encode($novel, JSON_UNESCAPED_UNICODE); RedisModel::lPush($this->redisKey, $value); } } public function consume() { $novelDataList = NovelConf::query() ->where('enable', 1) ->get(); if(empty($novelDataList)) { $this->infoMessage(' 暂无启用的小说数据'); return ; } $novelDataList = $novelDataList->keyBy('id')->toArray(); $time = time(); $val = null; while(1) { if(time() - $time >= 1200) { die; } $val = RedisModel::rPop($this->redisKey); if(empty($val)) { sleep(10); continue; } try{ $this->handleData(json_decode($val, 1), $novelDataList); } catch (\Exception $exception) { // 异常情况下将数据重新塞入队列 RedisModel::lPush($this->redisKey, $val); $this->infoMessage(' 异常###file:'.$exception->getFile().';line:'.$exception->getLine().';message:'.$exception->getMessage()); } } } public function handleData($v, $novelDataList) { $novelInfo = isset($novelDataList[$v['novel_id']]) ? $novelDataList[$v['novel_id']] : null; if(is_null($novelInfo)) { return ; } $v['novel_name'] = $novelInfo['name']; $v['first_day_roi'] = 0; $v['new_user_cost'] = 0; $v['new_user_charge'] = 0; $v['day_paid'] = 0; $v['charge_total'] = 0; $v['cost_cover_rate'] = 0; $v['fans_increase'] = 0; $v['new_user_charge_uv'] = 0; $v['new_user_charge_pv'] = 0; $v['charge_data'] = ''; $v['charge_user_cost'] = 0; $v['subscribe_date'] = $v['subscribed_date']; $v['start_date'] = $v['end_date'] = $v['subscribed_date']; $chargeData = []; $this->infoMessage(' 开始处理小说:'.$v['novel_name'].'在'.$v['start_date'].'日的数据;'); # 查询公众号对应的adq账号列表 if(!empty($v['app_list'])){ $adqAccountList = ADQAccountBinding::query() ->whereIn('app_id', $v['app_list']) ->where('start_date', '<=', $v['subscribed_date']) ->where('end_date', '>=', $v['subscribed_date']) ->where('status', 1) ->where('enable', 1) ->get(); $adqAccountIdList = $adqAccountList->isNotEmpty() ? array_column($adqAccountList->toArray(), 'account_id') : []; } else { $adqAccountIdList = []; $adqAccountList = null; } $res = NovelDataStatistics::paymentTrendByDay($v['app_list'], $adqAccountList, $v['subscribed_date']); if(empty($res)) { $this->infoMessage(' 小说:'.$v['novel_name'].'在'.$v['start_date'].'日的趋势数据为空'); $res = []; } else { $res = json_decode(json_encode($res), 1); } if(empty($v['app_list'])) { $this->infoMessage(' 小说:'.$v['novel_name'].'在'.$v['start_date'].'日的app_list数据为空'); # 当天消耗 $v['day_paid'] = 0; # 新增粉丝数 $v['fans_increase'] = 0; } else { # 数据计算处理以及赋予默认值 $adsReport = AdsReport::query() ->selectRaw('sum(paid) as paid, sum(follow_uv) as follow_uv') ->whereIn('app_id', $v['app_list']) ->where('ref_date', $v['subscribed_date']) ->first(); if(empty($adqAccountIdList)) { $adqAdsReport = null; } else { $adqAdsReport = AdsReport::query() ->selectRaw('sum(paid) as paid, sum(follow_uv) as follow_uv') ->whereIn('account_id', $adqAccountIdList) ->where('ref_date', $v['subscribed_date']) ->first(); } # 当天消耗 $v['day_paid'] = isset($adsReport->paid) ? $v['day_paid'] + $adsReport->paid / 100 : $v['day_paid']; $v['day_paid'] = isset($adqAdsReport->paid) ? $v['day_paid'] + $adqAdsReport->paid / 100 : $v['day_paid']; # 新增粉丝数 $v['fans_increase'] = isset($adsReport->follow_uv) ? $v['fans_increase'] + $adsReport->follow_uv : $v['fans_increase']; $v['fans_increase'] = isset($adqAdsReport->follow_uv) ? $v['fans_increase'] + $adqAdsReport->follow_uv : $v['fans_increase']; } foreach($res as $item) { # 当日新用户累计充值 $v['charge_total'] = isset($item['charge_total']) ? $v['charge_total'] + $item['charge_total'] : $v['charge_total']; # 充值人数 $v['new_user_charge_uv'] = isset($item['new_user_charge_uv']) ? $v['new_user_charge_uv'] + $item['new_user_charge_uv'] : $v['new_user_charge_uv']; # 充值次数 $v['new_user_charge_pv'] = isset($item['new_user_charge_pv']) ? $v['new_user_charge_pv'] + $item['new_user_charge_pv'] : $v['new_user_charge_pv']; # 开始日志至之后的100天数据 for ($i = 0; $i < WxAccountService::DAYS; $i++) { if (isset($chargeData[$i])) { //已存在,累加 $chargeData[$i]['day_charge'] = isset($item['chargeData'][$i]['day_charge']) ? $chargeData[$i]['day_charge'] + $item['chargeData'][$i]['day_charge'] : $chargeData[$i]['day_charge']; $chargeData[$i]['charge_total'] = isset($item['chargeData'][$i]['charge_total']) ? $chargeData[$i]['charge_total'] + $item['chargeData'][$i]['charge_total'] : $chargeData[$i]['charge_total']; $chargeData[$i]['day_paid'] = isset($item['chargeData'][$i]['day_paid']) ? $chargeData[$i]['day_paid'] + $item['chargeData'][$i]['day_paid'] : $chargeData[$i]['day_paid']; } else { //不存在,赋值 $chargeData[$i]['day_charge'] = isset($item['chargeData'][$i]['day_charge']) ? $item['chargeData'][$i]['day_charge'] : 0; $chargeData[$i]['charge_total'] = isset($item['chargeData'][$i]['charge_total']) ? $item['chargeData'][$i]['charge_total'] : 0; $chargeData[$i]['day_paid'] = isset($item['chargeData'][$i]['day_paid']) ? $item['chargeData'][$i]['day_paid'] : 0; } } } /*上面两项不在循环中累加的原因在于,循环中重点统计有新增粉丝的数据,循环中累加会丢失部分没有新粉数据日期的消耗*/ # 首日ROI $v['first_day_roi'] = isset($chargeData[0]['day_paid']) && isset($chargeData[0]['day_charge']) && $chargeData[0]['day_paid'] != 0 ? ($chargeData[0]['day_charge'] / $chargeData[0]['day_paid']) : 0; # 回本率 $v['cost_cover_rate'] = $v['day_paid'] != 0 ? ($v['charge_total'] / $v['day_paid']) : 0; # 新增粉丝成本 $v['new_user_cost'] = $v['fans_increase'] != 0 ? ($v['day_paid'] / $v['fans_increase']) : 0; # 充值用户成本 $v['charge_user_cost'] = $v['new_user_charge_uv'] != 0 ? ($v['day_paid'] / $v['new_user_charge_uv']) : 0; for ($i = 0; $i < WxAccountService::DAYS; $i++) { $chargeData[$i]['day_charge'] = isset($chargeData[$i]['day_charge']) ? $chargeData[$i]['day_charge'] : 0; $chargeData[$i]['charge_total'] = isset($chargeData[$i]['charge_total']) ? $chargeData[$i]['charge_total'] : 0; $chargeData[$i]['day_paid'] = isset($chargeData[$i]['day_paid']) ? $chargeData[$i]['day_paid'] : 0; $chargeData[$i]['day_add'] = (isset($chargeData[$i]['day_paid']) && $chargeData[$i]['day_paid'] != 0) ? ($chargeData[$i]['day_charge'] / $chargeData[$i]['day_paid']) : 0; $chargeData[$i]['day_cover'] = (isset($chargeData[$i]['day_paid']) && $chargeData[$i]['day_paid'] != 0) ? ($chargeData[$i]['charge_total'] / $chargeData[$i]['day_paid']) : 0; $chargeData[$i]['day_times'] = isset($chargeData[$i]['day_paid']) && ($chargeData[$i]['day_paid'] * $v['first_day_roi']) != 0 ? ($chargeData[$i]['charge_total'] / ($chargeData[$i]['day_paid'] * $v['first_day_roi'])) : 0; } $v['charge_data'] = json_encode($chargeData); $this->insertData($v); } public function paymentTrendByDay($appId, $adqAccountList, $date) { # 获取对应的公众号AppId if(is_array($appId)){ $appIdList = $appId; } else { $appIdList = [$appId]; } # 查询账号信息 $accountData = Account::query()->selectRaw("channel_id, platform_id, nickname, app_id") ->whereIn('app_id', $appIdList)->get(); # 获取各日期对应的花费 $paidList = AdsReport::getPaidData($appIdList, [$date]); # 查询adq账号消耗数据 $adqAccountIdList = (!empty($adqAccountList) && $adqAccountList->isNotEmpty()) ? array_column($adqAccountList->toArray(), 'account_id') : []; if(!empty($adqAccountIdList)){ $adqPaidList = AdsReport::getAdqPaidData($adqAccountIdList, [$date]); } else { $adqPaidList = null; } $result = []; foreach($appIdList as $app_id){ $trend = []; $channelIdList = $accountData->where('app_id', $app_id)->pluck('channel_id')->toArray(); $fansData = self::subscribedFansData($channelIdList, $date, $date); $fansData = json_decode(json_encode($fansData), 1); $trend['fans_total'] = !empty($fansData) ? $fansData['fans_total'] : 0; $trend['charge_total'] = !empty($fansData) ? $fansData['charge_total'] / 100 : 0; $trend['app_id'] = $app_id; # 当日消耗 $paidInfo = $paidList->where('app_id', $app_id) ->where('ref_date', $date) ->first(); $trend['day_paid'] = isset($paidInfo->paid) ? ($paidInfo->paid / 100) : 0; # adq消耗 $adqAccountIdList = $adqAccountList->where('app_id', $app_id)->pluck('account_id')->toArray(); $adqPaid = !empty($adqPaidList) && $adqPaidList->isNotEmpty() ? $adqPaidList->whereIn('account_id', $adqAccountIdList)->sum('paid') : 0; $trend['day_paid'] += $adqPaid/100; # 获取当日新用户充值情况 $newUserCharge = Order::getNewUserChargeSecond($date, $channelIdList); $trend['new_user_charge'] = isset($newUserCharge->new_user_charge) ? $newUserCharge->new_user_charge : 0; $trend['new_user_charge_uv'] = isset($newUserCharge->new_user_charge_uv) ? $newUserCharge->new_user_charge_uv : 0; $trend['new_user_charge_pv'] = isset($newUserCharge->new_user_charge_pv) ? $newUserCharge->new_user_charge_pv : 0; $trend['first_day_roi'] = $trend['day_paid'] != 0 ? (($trend['new_user_charge'] / 100) / $trend['day_paid']) : 0; // 充增回倍 $trend['chargeData'] = WxAccountService::chargeAfterSubscribed( $channelIdList, $date, WxAccountService::DAYS, $app_id, $trend['first_day_roi'], $trend['day_paid'] ); $result[] = $trend; } return $result; } /* * 获取时间段内新关注粉丝数据 * */ public function subscribedFansData($channelIds, $startDate, $endDate) { # 获取公众号数据 $list = Order::query() ->selectRaw("DATE_FORMAT(user_created_at, '%Y-%m-%d') as subscribed_date, count(member_openid) as fans_total, channel_id, sum(price) as charge_total") ->where('enable', 1) ->where(function($query) use($channelIds) { if(!empty($channelIds)) $query->whereIn('channel_id', $channelIds); }) ->where('user_created_at', '>=', $startDate . ' 00:00:00') ->where('user_created_at', '<=', $endDate . ' 23:59:59') ->where('status', 1) ->first(); return $list; } /* * 付费趋势(废弃) * */ public function paymentTrend($appId, $startDate, $endDate, $page, $pageSize) { # 获取对应的公众号AppId if(is_array($appId)){ $appIdList = $appId; } else { $appIdList = [$appId]; } $channelIdList = Account::getChannelIdByAppId($appIdList); $channelIdList = $channelIdList->toArray(); // $channelIds = array_column($channelIdList, 'channel_id'); # 获取不同时间段内新增的粉丝充值信息 // list($fansData, $total) = Order::subscribedFansData($channelIds, '2021-09-22', '2021-09-22', $page, $pageSize); list($fansData, $total) = Order::subscribedFansData($channelIdList, $startDate, $endDate, $page, $pageSize); # 获取列表内的日期 $dateList = array_unique(array_column($fansData->toArray(), 'subscribed_date')); if(empty($dateList)) return [[], 0]; # 渠道ID集合 $channelIdArr = array_unique(array_column($fansData->toArray(), 'channel_id'));; # 获取公众号信息 $accountData = Account::getAccountListByChannelIds($channelIdArr); # 获取各日期对应的花费 $paidList = AdsReport::getPaidData($appIdList, $dateList); foreach($fansData as $trend) { $trend->charge_total = $trend->charge_total / 100; $trend->app_id = isset($accountData[$trend->channel_id]['app_id']) ? $accountData[$trend->channel_id]['app_id'] : null; # 当日消耗 $paidInfo = $paidList->where('app_id', $trend->app_id) ->where('ref_date', $trend->subscribed_date) ->first(); $trend->day_paid = isset($paidInfo->paid) ? ($paidInfo->paid / 100) : 0; # 获取当日新用户充值情况 $newUserCharge = Order::getNewUserCharge($trend->subscribed_date, $trend->channel_id); $trend->new_user_charge = isset($newUserCharge->new_user_charge) ? $newUserCharge->new_user_charge : 0; $trend->new_user_charge_uv = isset($newUserCharge->new_user_charge_uv) ? $newUserCharge->new_user_charge_uv : 0; $trend->new_user_charge_pv = isset($newUserCharge->new_user_charge_pv) ? $newUserCharge->new_user_charge_pv : 0; $trend->first_day_roi = $trend->day_paid != 0 ? (($trend->new_user_charge / 100) / $trend->day_paid) : 0; // 充增回倍 $trend->chargeData = WxAccountService::chargeAfterSubscribed( [$trend->channel_id], $trend->subscribed_date, WxAccountService::DAYS, $trend->app_id, $trend->first_day_roi, $trend->day_paid ); } return [$fansData, $total]; } public function insertData($novel) { $insertData['start_date'] = $novel['start_date']; $insertData['end_date'] = $novel['end_date']; $insertData['novel_name'] = $novel['novel_name']; $insertData['novel_id'] = $novel['novel_id']; $insertData['first_day_roi'] = $novel['first_day_roi']; $insertData['day_paid'] = $novel['day_paid']; $insertData['charge_total'] = $novel['charge_total']; $insertData['cost_cover_rate'] = $novel['cost_cover_rate']; $insertData['fans_increase'] = $novel['fans_increase']; $insertData['new_user_cost'] = $novel['new_user_cost']; $insertData['new_user_charge_uv'] = $novel['new_user_charge_uv']; $insertData['new_user_charge_pv'] = $novel['new_user_charge_pv']; $insertData['charge_user_cost'] = $novel['charge_user_cost']; $insertData['charge_data'] = $novel['charge_data']; $row = NovelData::query() ->where('novel_id', $insertData['novel_id']) ->where('start_date', $insertData['start_date']) ->first(); if($row) { # 更新 $res = NovelData::query() ->where('novel_id', $insertData['novel_id']) ->where('start_date', $insertData['start_date']) ->update($insertData); if($res) { # 更新成功 $this->infoMessage(' 处理小说:'.$insertData['novel_name'].'在'.$insertData['start_date'].'日的数据完成;处理结果:更新成功'); } else { # 未有修改 $this->infoMessage(' 处理小说:'.$insertData['novel_name'].'在'.$insertData['start_date'].'日的数据完成;处理结果:更新失败'); } } else { $insertData['created_date'] = date('Y-m-d'); # 插入 $res = NovelData::query() ->insert($insertData); if($res) { # 新增成功 $this->infoMessage(' 处理小说:'.$insertData['novel_name'].'在'.$insertData['start_date'].'日的数据完成;处理结果:新增成功'); } else { # 新增失败 $this->infoMessage(' 处理小说:'.$insertData['novel_name'].'在'.$insertData['start_date'].'日的数据完成;处理结果:新增失败'); } } } public function infoMessage($message) { echo date('Y-m-d H:i:s') . $message .' 内存占用:'.round(memory_get_usage()/1024/1024, 2).'MB'. "\r\n"; } }