import time import threading import json from rds_model.rds_user_live_request_list import RdsUserLiveRequestList from libs.dy_user_live_info import DyUserLiveInfo from log.print_log import PrintLog def scrape(): rds_user_info_list = RdsUserLiveRequestList() while True: try: uid = rds.get_request_param() if uid is None: time.sleep(2) continue uid = str(uid) # 若获取到的店播用户ID非纯数字,跳出循环 if not uid.isdigit(): continue PrintLog.print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + uid + ' 开始抓取用户信息') res = DyUserLiveInfo.get_data(uid=uid) if (res == '') or (res is None): rds_user_info_list.record_score(0) rds_user_info_list.push_request_param(uid) PrintLog.print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + '数据获取失败!响应数据为空!' + '\n' + uid + '\n' ) continue response_json = json.loads(res) if response_json.get('data').get('user') is None: rds_user_info_list.record_score(0) rds_user_info_list.push_request_param(uid) PrintLog.print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + '数据获取失败!达人数据为空!' + '\n' + uid + '\n' + res ) continue data = json.dumps({ "data": response_json.get('data'), "extra": { 'uid': uid } }) print('抓取成功') rds_user_info_list.record_score(1) rds_user_info_list.push_data_list(data) except Exception as e: print('抓取失败') rds_user_info_list.record_score(0) rds_user_info_list.push_request_param(uid) PrintLog.print(time.strftime("%H:%M:%S", time.localtime()) + ' ' + uid + '数据异常:' + str(e)) time.sleep(0.1) if __name__ == "__main__": print("主方法开始执行") rds = RdsUserLiveRequestList() print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + ' 开始执行,待爬取弹幕直播队列长度:' + str(rds.get_user_live_monitor_len())) for i in range(1, 100): task = threading.Thread(target=scrape, name=i) task.start() # 准备就绪,等待cpu执行