123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- import time
- import threading
- import json
- import sys
- from rds_model.rds_dy_live_reward_request_list import RdsDyLiveRewardRequestList
- from log.print_log import PrintLog
- from libs.mysql_dy_live import MysqlDyLive
- from libs.dy_live_reward_info import DyLiveRewardInfo
- def scrape():
- while True:
- try:
- # 从 待爬取直播间 列表中获取一个 直播间信息
- room_info = rds.get_request_param()
- # 如果没有待爬取的直播,则等一秒,循环
- if room_info is None:
- time.sleep(0.1)
- continue
- # 判断是否到达爬取时间以确定是否需要爬取弹幕,并直接塞回队列尾部
- room_dict = json.loads(room_info)
- room_id = str(room_dict['room_id']) # 直播间ID
- scrape_time = room_dict['scrape_time'] # 上次抓取时间
- uid = room_dict['uid'] # 直播网红ID
- if (uid is None) or (room_id is None):
- PrintLog.print(
- time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + '请求数据数据异常!' + '\n'
- + room_info
- )
- time.sleep(1)
- continue
- room_dict.setdefault('times', 0)
- room_dict['times'] = (room_dict['times'] + 1) % 10
- # 按照策略间隔性判断直播是否已停播
- if room_dict['times'] == 0: # 每爬取十次判断一次是否已关播
- # 判断直播是否结束
- live_info = MysqlDyLive().get_live_info(room_id)
- if live_info is None: # 直播不存在,不再回塞数据到爬取队列
- continue
- pk_id, room_id, status = live_info
- if int(status) == 4: # 直播已结束不再回塞数据到爬取队列
- continue
- time_diff = int(time.time()) - int(scrape_time)
- if time_diff > 120:
- # 爬取前更新爬取时间塞回队列
- room_dict['scrape_time'] = int(time.time())
- rds.push_request_id(json.dumps(room_dict))
- response_json = DyLiveRewardInfo.get_data(room_id)
- if (response_json.get('data') is None) or (response_json.get('data').get('ranks') is None):
- PrintLog.print(time.strftime("%H:%M:%S", time.localtime()) + ' 获取打赏数据异常:' + room_id + ' ' + uid)
- rds.record_score(0)
- continue
- # 没有商品
- if len(response_json.get('data').get('ranks')) == 0:
- PrintLog.print(time.strftime("%H:%M:%S", time.localtime()) + ' 没有打赏数据:' + room_id + ' ' + uid)
- continue
- data = json.dumps({
- "data": response_json.get('data'),
- "extra": {
- 'room_id': room_id,
- 'uid': uid,
- }
- })
- rds.record_score(1)
- rds.push_data_list(data)
- else:
- print('直播ID%s %d秒前曾爬取过,暂无需继续抓取' % (room_id, time_diff))
- time.sleep(0.1)
- rds.push_request_id(json.dumps(room_dict))
- except Exception as e:
- PrintLog.print(time.strftime("%H:%M:%S", time.localtime()) + ' ' + room_id + '数据异常:' + str(e))
- time.sleep(0.1)
- if __name__ == "__main__":
- print("主方法开始执行")
- rds = RdsDyLiveRewardRequestList()
- print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + ' 开始执行,待爬取直播队列长度:' + str(rds.get_len()))
- for i in range(1, 80):
- task = threading.Thread(target=scrape, name=i)
- task.start() # 准备就绪,等待cpu执行
|