店播爬取Python脚本

live_replay_download.py 3.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import time
  2. import json
  3. import threading
  4. import requests
  5. import sys
  6. from rds_model.rds_live_replay_set import RdsLiveReplaySet
  7. from libs.mysql_live_replay import MysqlLiveReplay
  8. from log.print_log import PrintLog
  9. def live_download(live_data_json):
  10. live_data = json.loads(live_info_json)
  11. play_url = live_data['play_urls']
  12. live_data['live_id'] = str(live_data['live_id'])
  13. # for play_url in live_data['play_urls']:
  14. try:
  15. # 判断是否为重试任务
  16. is_retry = MysqlLiveReplay().is_retry_task(live_data['id'])
  17. if is_retry is None:
  18. MysqlLiveReplay().set_task_begin(live_data['id'], '1')
  19. else:
  20. MysqlLiveReplay().set_task_status(live_data['id'], '1')
  21. # requests.get返回的是一个可迭代对象(Iterable),此时Python SDK会通过Chunked Encoding方式上传。
  22. input_stream = requests.get(play_url, stream=True)
  23. PrintLog.print(live_data['live_id'] + ' 开始下载:' + play_url)
  24. f = open(sys.path[0] + '/file/' + live_data['live_id'] + ".flv", "ab+") # 有则追加,没有则创建
  25. # 将视频数据分片写入文件
  26. for chunk in input_stream.iter_content(chunk_size=512):
  27. if chunk:
  28. f.write(chunk)
  29. f.close()
  30. PrintLog.print(live_data['live_id'] + ' 下载完成:' + play_url)
  31. tmp_url = 'http://39.107.156.19:8006/' + live_data['live_id'] + '.flv'
  32. res = MysqlLiveReplay().set_video_url(live_data['id'], tmp_url)
  33. return
  34. except Exception as eMsg:
  35. PrintLog.print(live_data['live_id'] + '下载异常:' + play_url + "\r\n" + str(eMsg))
  36. MysqlLiveReplay().set_task_status(live_data['id'], '3')
  37. def get_active_thread_set():
  38. # 获取爬取中的直播ID集合
  39. active_thread_set = set()
  40. for active_thread in threading.enumerate(): # 线程列表
  41. if active_thread.getName() != 'MainThread':
  42. active_thread_set.add(active_thread.getName())
  43. return active_thread_set
  44. if __name__ == "__main__":
  45. print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + "主方法开始执行")
  46. rds = RdsLiveReplaySet()
  47. while True:
  48. # 从待下载的直播集合中获取一条
  49. live_info_json = rds.get_live_info()
  50. # 若暂无待下载的直播信息,停止1s
  51. if live_info_json is None:
  52. time.sleep(1)
  53. continue
  54. try:
  55. live_info = json.loads(live_info_json)
  56. live_info['live_id'] = str(live_info['live_id'])
  57. thread_name = 'db_live_download_' + live_info['live_id']
  58. # 判断该直播信息是否在爬取中,有则跳过
  59. if thread_name in get_active_thread_set():
  60. print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + " %s 已经在执行" % thread_name)
  61. continue
  62. # 创建爬取任务
  63. task = threading.Thread(target=live_download, args=(live_info_json,), name=thread_name)
  64. task.start() # 准备就绪,等待cpu执行
  65. except Exception as e:
  66. print(e.args)
  67. PrintLog.print('下载直播抛出异常:'+ str(e) + '\n' + live_info_json)
  68. time.sleep(1)
  69. continue