import time import threading import json from rds_model.rds_dy_barrage_request_list import RdsDyBarrageRequestList from log.print_log import PrintLog from libs.dy_barrage_info import DyBarrageInfo from libs.mysql_dy_live import MysqlDyLive start_time = time.time() def scrape(): while True: try: # 从 待爬取直播间ID 列表中获取一个 直播间ID room_info = rds.get_request_param() # 如果没有待爬取的直播,则等一秒,循环 if room_info is None: time.sleep(1) continue # 判断是否到达爬取时间以确定是否需要爬取弹幕,并直接塞回队列尾部 room_dict = json.loads(room_info) stream_id = str(room_dict['room_id']) # 直播间ID scrape_time = room_dict['scrape_time'] # 上次抓取时间 print(time.strftime("%H:%M:%S", time.localtime()) + '直播间ID为%s' % stream_id) if stream_id is None or scrape_time is None: time.sleep(1) continue room_dict.setdefault('times', 0) room_dict['times'] = (room_dict['times'] + 1) % 10 # 按照策略间隔性判断直播是否已停播 if room_dict['times'] == 0: # 每爬取十次判断一次是否已关播 # 判断直播是否结束 live_info = MysqlDyLive().get_live_info(stream_id) if live_info is None: # 直播不存在,不再回塞数据到爬取队列 continue pk_id, room_id, status = live_info if int(status) == 4: # 直播已结束不再回塞数据到爬取队列 continue # 直播未结束爬取完成塞回队列 time_diff = int(time.time()) - int(scrape_time) if time_diff > 5: # 爬取前更新爬取时间塞回队列 room_dict['scrape_time'] = int(time.time()) rds.push_request_id(json.dumps(room_dict)) response_json = DyBarrageInfo.get_data(stream_id) if not bool(response_json): time.sleep(0.1) continue else: data = json.dumps(response_json) rds.push_data_list(data) else: PrintLog.print('直播ID%s %d秒前曾爬取过,暂无需继续抓取' % (stream_id, time_diff)) rds.push_request_id(json.dumps(room_dict)) except Exception as e: PrintLog.print(time.strftime("%H:%M:%S", time.localtime()) + ' ' + stream_id + '数据异常:' + str(e)) time.sleep(0.1) if __name__ == "__main__": print("主方法开始执行") rds = RdsDyBarrageRequestList() print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + ' 开始执行,待爬取弹幕直播队列长度:' + str(rds.get_len())) # 获取弹幕爬取允许最大线程数 scrape_max_threading = rds.max_threading() for i in range(1, int(scrape_max_threading)): task = threading.Thread(target=scrape, name=i) task.start() # 准备就绪,等待cpu执行