import time import threading import json from rds_model.rds_user_info_list import RdsUserInfoList from log.print_log import PrintLog from web_dy import WebDouYin def scrape(): rds_list = RdsUserInfoList() web_dy = WebDouYin() start_time = int(time.time()) while True: try: if int(time.time())-start_time > 5*60: break sec_uid = rds_list.get_wait_update_user() if sec_uid is None: time.sleep(0.1) continue sec_uid = str(sec_uid) PrintLog.print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + sec_uid + '开始抓取用户信息') response_json = web_dy.get_user_info(sec_uid) if response_json is None: rds_list.put_wait_update_user(sec_uid) PrintLog.print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + '数据获取失败!响应数据为空!' + '\n' + sec_uid + '\n' ) data = json.dumps({ "data": response_json.get('user'), "extra": { 'room_id': sec_uid } }) print('爬取成功') rds_list.put_user_info(data) except Exception as e: print('爬取失败,'+sec_uid) # rds_list.put_wait_update_user(sec_uid) PrintLog.print(time.strftime("%H:%M:%S", time.localtime()) + ' ' + sec_uid + '数据异常:' + str(e)) time.sleep(0.1) if __name__ == "__main__": print("主方法开始执行") rds = RdsUserInfoList() print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + ' 开始执行,待更新直播队列长度:' + str(rds.get_len())) for i in range(1, 30): task = threading.Thread(target=scrape, name=i) task.start() # 准备就绪,等待cpu执行