1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- import time
- import threading
- import json
- from rds_model.rds_dy_barrage_list import RdsDouYinBarrageList
- from libs.mysql_dy_live_market_record import MysqlDyLiveMarketRecord
- from log.print_log import PrintLog
- from libs.dy_barrage_info import DyBarrageInfo
- start_time = time.time()
- def scrape():
- while True:
- try:
- # 从 待爬取直播间ID 列表中获取一个 直播间ID
- room_info = rds.get_request_param()
- # 如果没有待爬取的直播,则等一秒,循环
- if room_info is None:
- time.sleep(1)
- continue
- # 判断是否到达爬取时间以确定是否需要爬取弹幕,并直接塞回队列尾部
- room_dict = json.loads(room_info)
- stream_id = str(room_dict['room_id']) # 直播间ID
- scrape_time = room_dict['scrape_time'] # 上次抓取时间
- # uid = room_dict['uid'] # 直播网红ID
- if stream_id is None or scrape_time is None:
- time.sleep(1)
- continue
- room_dict.setdefault('times', 0)
- room_dict['times'] = (room_dict['times'] + 1) % 10
- if room_dict['times'] == 0: # 每爬取十次判断一次是否需要继续监测
- live_info = MysqlDyLiveMarketRecord().new_monitor(stream_id)
- if live_info is None: # 直播信息不存在,不再回塞数据到爬取队列
- continue
- # 直播未结束爬取完成塞回队列
- time_diff = int(time.time()) - int(scrape_time)
- if time_diff > 1:
- # 爬取前更新爬取时间塞回队列
- room_dict['scrape_time'] = int(time.time())
- rds.push_request_id(json.dumps(room_dict))
- response_json = DyBarrageInfo.get_data(stream_id)
- if not bool(response_json):
- time.sleep(0.1)
- continue
- else:
- data = json.dumps(response_json)
- rds.push_data_list(data)
- else:
- PrintLog.print('直播ID%s %d秒前曾爬取过,暂无需继续抓取' % (stream_id, time_diff))
- rds.push_request_id(json.dumps(room_dict))
- except Exception as e:
- PrintLog.print(time.strftime("%H:%M:%S", time.localtime()) + ' ' + stream_id + '数据异常:' + str(e))
- time.sleep(0.1)
- if __name__ == "__main__":
- print("主方法开始执行")
- rds = RdsDouYinBarrageList()
- print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + ' 开始执行,待爬取弹幕直播队列长度:' + str(rds.get_len()))
- for i in range(1, 20):
- task = threading.Thread(target=scrape, name=i)
- task.start() # 准备就绪,等待cpu执行
|