店播爬取Python脚本

dy_userinfo_update.py 1.9KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. import time
  2. import threading
  3. import json
  4. from rds_model.rds_user_info_list import RdsUserInfoList
  5. from log.print_log import PrintLog
  6. from web_dy import WebDouYin
  7. def scrape():
  8. rds_list = RdsUserInfoList()
  9. web_dy = WebDouYin()
  10. start_time = int(time.time())
  11. while True:
  12. try:
  13. if int(time.time())-start_time > 5*60:
  14. break
  15. sec_uid = rds_list.get_wait_update_user()
  16. if sec_uid is None:
  17. time.sleep(0.1)
  18. continue
  19. sec_uid = str(sec_uid)
  20. PrintLog.print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + sec_uid + '开始抓取用户信息')
  21. response_json = web_dy.get_user_info(sec_uid)
  22. if response_json is None:
  23. rds_list.put_user_info(sec_uid)
  24. PrintLog.print(
  25. time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + '数据获取失败!响应数据为空!' + '\n'
  26. + sec_uid + '\n'
  27. )
  28. data = json.dumps({
  29. "data": response_json.get('user'),
  30. "extra": {
  31. 'room_id': sec_uid
  32. }
  33. })
  34. print('爬取成功')
  35. rds_list.put_user_info(data)
  36. except Exception as e:
  37. print('爬取失败')
  38. rds_list.put_wait_update_user(sec_uid)
  39. PrintLog.print(time.strftime("%H:%M:%S", time.localtime()) + ' ' + sec_uid + '数据异常:' + str(e))
  40. time.sleep(0.1)
  41. if __name__ == "__main__":
  42. print("主方法开始执行")
  43. rds = RdsUserInfoList()
  44. print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + ' 开始执行,待更新直播队列长度:' + str(rds.get_len()))
  45. for i in range(1, 50):
  46. task = threading.Thread(target=scrape, name=i)
  47. task.start() # 准备就绪,等待cpu执行