店播爬取Python脚本

dy_live_reward.py 3.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import time
  2. import threading
  3. import json
  4. import sys
  5. from rds_model.rds_dy_live_reward_request_list import RdsDyLiveRewardRequestList
  6. from log.print_log import PrintLog
  7. from libs.mysql_dy_live import MysqlDyLive
  8. from libs.dy_live_reward_info import DyLiveRewardInfo
  9. def scrape():
  10. while True:
  11. try:
  12. # 从 待爬取直播间 列表中获取一个 直播间信息
  13. room_info = rds.get_request_param()
  14. # 如果没有待爬取的直播,则等一秒,循环
  15. if room_info is None:
  16. time.sleep(0.1)
  17. continue
  18. # 判断是否到达爬取时间以确定是否需要爬取弹幕,并直接塞回队列尾部
  19. room_dict = json.loads(room_info)
  20. room_id = str(room_dict['room_id']) # 直播间ID
  21. scrape_time = room_dict['scrape_time'] # 上次抓取时间
  22. uid = room_dict['uid'] # 直播网红ID
  23. if (uid is None) or (room_id is None):
  24. PrintLog.print(
  25. time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + '请求数据数据异常!' + '\n'
  26. + room_info
  27. )
  28. time.sleep(1)
  29. continue
  30. room_dict.setdefault('times', 0)
  31. room_dict['times'] = (room_dict['times'] + 1) % 10
  32. # 按照策略间隔性判断直播是否已停播
  33. if room_dict['times'] == 0: # 每爬取十次判断一次是否已关播
  34. # 判断直播是否结束
  35. live_info = MysqlDyLive().get_live_info(room_id)
  36. if live_info is None: # 直播不存在,不再回塞数据到爬取队列
  37. continue
  38. pk_id, room_id, status = live_info
  39. if int(status) == 4: # 直播已结束不再回塞数据到爬取队列
  40. continue
  41. time_diff = int(time.time()) - int(scrape_time)
  42. if time_diff > 120:
  43. # 爬取前更新爬取时间塞回队列
  44. room_dict['scrape_time'] = int(time.time())
  45. rds.push_request_id(json.dumps(room_dict))
  46. response_json = DyLiveRewardInfo.get_data(room_id)
  47. if (response_json.get('data') is None) or (response_json.get('data').get('ranks') is None):
  48. PrintLog.print(time.strftime("%H:%M:%S", time.localtime()) + ' 获取打赏数据异常:' + room_id + ' ' + uid)
  49. rds.record_score(0)
  50. continue
  51. # 没有商品
  52. if len(response_json.get('data').get('ranks')) == 0:
  53. PrintLog.print(time.strftime("%H:%M:%S", time.localtime()) + ' 没有打赏数据:' + room_id + ' ' + uid)
  54. continue
  55. data = json.dumps({
  56. "data": response_json.get('data'),
  57. "extra": {
  58. 'room_id': room_id,
  59. 'uid': uid,
  60. }
  61. })
  62. rds.record_score(1)
  63. rds.push_data_list(data)
  64. else:
  65. print('直播ID%s %d秒前曾爬取过,暂无需继续抓取' % (room_id, time_diff))
  66. time.sleep(0.1)
  67. rds.push_request_id(json.dumps(room_dict))
  68. except Exception as e:
  69. PrintLog.print(time.strftime("%H:%M:%S", time.localtime()) + ' ' + room_id + '数据异常:' + str(e))
  70. time.sleep(0.1)
  71. if __name__ == "__main__":
  72. print("主方法开始执行")
  73. rds = RdsDyLiveRewardRequestList()
  74. print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + ' 开始执行,待爬取直播队列长度:' + str(rds.get_len()))
  75. for i in range(1, 80):
  76. task = threading.Thread(target=scrape, name=i)
  77. task.start() # 准备就绪,等待cpu执行