import time import threading import json from rds_model.rds_room_info_request_list import RdsRoomInfoRequestList from libs.mysql_dy_live_market_record import MysqlDyLiveMarketRecord from libs.room_info import RoomInfo from log.print_log import PrintLog def scrape(): while True: try: # 从 待爬取直播间 列表中获取一条直播间信息 room_info = rds.get_live_info() # 如果没有待爬取的直播,则等一秒,循环 if room_info is None: time.sleep(0.1) continue # 检出直播间相关信息 room_dict = json.loads(room_info) room_id = str(room_dict['room_id']) # 直播间ID scrape_time = room_dict['scrape_time'] # 上次抓取时间 uid = room_dict['uid'] # 直播网红ID if (uid is None) or (room_id is None): PrintLog.print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + '请求数据数据异常!' + '\n' + room_info ) time.sleep(1) continue room_dict.setdefault('times', 0) room_dict['times'] = (room_dict['times'] + 1) % 10 if room_dict['times'] == 0: # 每爬取十次判断一次是否需要继续监测 live_info = MysqlDyLiveMarketRecord().is_monitor(room_id, uid) if live_info is None: # 直播信息不存在,不再回塞数据到爬取队列 continue time_diff = int(time.time()) - int(scrape_time) if time_diff > 60: # 爬取前更新爬取时间塞回队列 room_dict['scrape_time'] = int(time.time()) rds.push_live_info(json.dumps(room_dict)) if not room_id.isdigit(): # 检测直播间ID是否非法-仅包含数字 continue PrintLog.print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + room_id + '开始抓取直播信息') response_json = RoomInfo.get_data(room_id=room_id) if response_json is None: rds.record_score_v2(0) PrintLog.print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + '数据获取失败!响应数据为空!' + '\n' + room_id + '\n' ) data = json.dumps({ "data": response_json.get('data').get('room'), "extra": { 'room_id': room_id }, "rt": True }) print(time.strftime("%H:%M:%S", time.localtime()) + '爬取成功') rds.record_score_v2(1) rds.push_data_list(data) else: PrintLog.print('直播ID%s %d秒前曾爬取过,暂无需继续抓取' % (room_id, time_diff)) time.sleep(0.1) rds.push_live_info(json.dumps(room_dict)) except Exception as e: rds.record_score_v2(0) PrintLog.print(time.strftime("%H:%M:%S", time.localtime()) + ' ' + room_id + '数据异常:' + str(e)) time.sleep(0.1) if __name__ == "__main__": print("主方法开始执行") rds = RdsRoomInfoRequestList() print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + ' 开始执行,待更新直播队列长度:' + str(rds.get_len())) for i in range(1, 50): task = threading.Thread(target=scrape, name=i) task.start() # 准备就绪,等待cpu执行