店播爬取Python脚本

dy_live_info_rt.py 3.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. import time
  2. import threading
  3. import json
  4. from rds_model.rds_room_info_request_list import RdsRoomInfoRequestList
  5. from libs.mysql_dy_live_market_record import MysqlDyLiveMarketRecord
  6. from libs.room_info import RoomInfo
  7. from log.print_log import PrintLog
  8. def scrape():
  9. while True:
  10. try:
  11. # 从 待爬取直播间 列表中获取一条直播间信息
  12. room_info = rds.get_live_info()
  13. # 如果没有待爬取的直播,则等一秒,循环
  14. if room_info is None:
  15. time.sleep(0.1)
  16. continue
  17. # 检出直播间相关信息
  18. room_dict = json.loads(room_info)
  19. room_id = str(room_dict['room_id']) # 直播间ID
  20. scrape_time = room_dict['scrape_time'] # 上次抓取时间
  21. uid = room_dict['uid'] # 直播网红ID
  22. if (uid is None) or (room_id is None):
  23. PrintLog.print(
  24. time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + '请求数据数据异常!' + '\n'
  25. + room_info
  26. )
  27. time.sleep(1)
  28. continue
  29. room_dict.setdefault('times', 0)
  30. room_dict['times'] = (room_dict['times'] + 1) % 10
  31. if room_dict['times'] == 0: # 每爬取十次判断一次是否需要继续监测
  32. live_info = MysqlDyLiveMarketRecord().is_monitor(room_id, uid)
  33. if live_info is None: # 直播信息不存在,不再回塞数据到爬取队列
  34. continue
  35. time_diff = int(time.time()) - int(scrape_time)
  36. if time_diff > 60:
  37. # 爬取前更新爬取时间塞回队列
  38. room_dict['scrape_time'] = int(time.time())
  39. rds.push_live_info(json.dumps(room_dict))
  40. if not room_id.isdigit(): # 检测直播间ID是否非法-仅包含数字
  41. continue
  42. PrintLog.print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + room_id + '开始抓取直播信息')
  43. response_json = RoomInfo.get_data(room_id=room_id)
  44. if response_json is None:
  45. rds.record_score_v2(0)
  46. PrintLog.print(
  47. time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + '数据获取失败!响应数据为空!' + '\n'
  48. + room_id + '\n'
  49. )
  50. data = json.dumps({
  51. "data": response_json.get('data').get('room'),
  52. "extra": {
  53. 'room_id': room_id
  54. },
  55. "rt": True
  56. })
  57. print(time.strftime("%H:%M:%S", time.localtime()) + '爬取成功')
  58. rds.record_score_v2(1)
  59. rds.push_data_list(data)
  60. else:
  61. PrintLog.print('直播ID%s %d秒前曾爬取过,暂无需继续抓取' % (room_id, time_diff))
  62. time.sleep(0.1)
  63. rds.push_live_info(json.dumps(room_dict))
  64. except Exception as e:
  65. rds.record_score_v2(0)
  66. PrintLog.print(time.strftime("%H:%M:%S", time.localtime()) + ' ' + room_id + '数据异常:' + str(e))
  67. time.sleep(0.1)
  68. if __name__ == "__main__":
  69. print("主方法开始执行")
  70. rds = RdsRoomInfoRequestList()
  71. print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + ' 开始执行,待更新直播队列长度:' + str(rds.get_len()))
  72. for i in range(1, 50):
  73. task = threading.Thread(target=scrape, name=i)
  74. task.start() # 准备就绪,等待cpu执行