店播爬取Python脚本

dy_barrage_scraper.py 3.2KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import time
  2. import threading
  3. import json
  4. from rds_model.rds_dy_barrage_request_list import RdsDyBarrageRequestList
  5. from log.print_log import PrintLog
  6. from libs.dy_barrage_info import DyBarrageInfo
  7. from libs.mysql_dy_live import MysqlDyLive
  8. start_time = time.time()
  9. def scrape():
  10. while True:
  11. try:
  12. # 从 待爬取直播间ID 列表中获取一个 直播间ID
  13. room_info = rds.get_request_param()
  14. # 如果没有待爬取的直播,则等一秒,循环
  15. if room_info is None:
  16. time.sleep(1)
  17. continue
  18. # 判断是否到达爬取时间以确定是否需要爬取弹幕,并直接塞回队列尾部
  19. room_dict = json.loads(room_info)
  20. stream_id = str(room_dict['room_id']) # 直播间ID
  21. scrape_time = room_dict['scrape_time'] # 上次抓取时间
  22. print(time.strftime("%H:%M:%S", time.localtime()) + '直播间ID为%s' % stream_id)
  23. if stream_id is None or scrape_time is None:
  24. time.sleep(1)
  25. continue
  26. room_dict.setdefault('times', 0)
  27. room_dict['times'] = (room_dict['times'] + 1) % 10
  28. # 按照策略间隔性判断直播是否已停播
  29. if room_dict['times'] == 0: # 每爬取十次判断一次是否已关播
  30. # 判断直播是否结束
  31. live_info = MysqlDyLive().get_live_info(stream_id)
  32. if live_info is None: # 直播不存在,不再回塞数据到爬取队列
  33. continue
  34. pk_id, room_id, status = live_info
  35. if int(status) == 4: # 直播已结束不再回塞数据到爬取队列
  36. continue
  37. # 直播未结束爬取完成塞回队列
  38. time_diff = int(time.time()) - int(scrape_time)
  39. if time_diff > 5:
  40. # 爬取前更新爬取时间塞回队列
  41. room_dict['scrape_time'] = int(time.time())
  42. rds.push_request_id(json.dumps(room_dict))
  43. response_json = DyBarrageInfo.get_data(stream_id)
  44. if not bool(response_json):
  45. time.sleep(0.1)
  46. continue
  47. else:
  48. data = json.dumps(response_json)
  49. rds.push_data_list(data)
  50. else:
  51. PrintLog.print('直播ID%s %d秒前曾爬取过,暂无需继续抓取' % (stream_id, time_diff))
  52. rds.push_request_id(json.dumps(room_dict))
  53. except Exception as e:
  54. PrintLog.print(time.strftime("%H:%M:%S", time.localtime()) + ' ' + stream_id + '数据异常:' + str(e))
  55. time.sleep(0.1)
  56. if __name__ == "__main__":
  57. print("主方法开始执行")
  58. rds = RdsDyBarrageRequestList()
  59. print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + ' 开始执行,待爬取弹幕直播队列长度:' + str(rds.get_len()))
  60. # 获取弹幕爬取允许最大线程数
  61. scrape_max_threading = rds.max_threading()
  62. for i in range(1, int(scrape_max_threading)):
  63. task = threading.Thread(target=scrape, name=i)
  64. task.start() # 准备就绪,等待cpu执行