123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- import time
- import json
- import threading
- import requests
- import sys
- from rds_model.rds_live_replay_set import RdsLiveReplaySet
- from libs.mysql_live_replay import MysqlLiveReplay
- from log.print_log import PrintLog
- def live_download(live_data_json):
- live_data = json.loads(live_info_json)
- play_url = live_data['play_urls']
- live_data['live_id'] = str(live_data['live_id'])
- # for play_url in live_data['play_urls']:
- try:
- # 判断是否为重试任务
- is_retry = MysqlLiveReplay().is_retry_task(live_data['id'])
- if is_retry is None:
- MysqlLiveReplay().set_task_begin(live_data['id'], '1')
- else:
- MysqlLiveReplay().set_task_status(live_data['id'], '1')
- # requests.get返回的是一个可迭代对象(Iterable),此时Python SDK会通过Chunked Encoding方式上传。
- input_stream = requests.get(play_url, stream=True)
- PrintLog.print(live_data['live_id'] + ' 开始下载:' + play_url)
- f = open(sys.path[0] + '/file/' + live_data['live_id'] + ".flv", "ab+") # 有则追加,没有则创建
- # 将视频数据分片写入文件
- for chunk in input_stream.iter_content(chunk_size=512):
- if chunk:
- f.write(chunk)
- f.close()
- PrintLog.print(live_data['live_id'] + ' 下载完成:' + play_url)
- tmp_url = 'http://39.107.156.19:8006/' + live_data['live_id'] + '.flv'
- res = MysqlLiveReplay().set_video_url(live_data['id'], tmp_url)
- return
- except Exception as eMsg:
- PrintLog.print(live_data['live_id'] + '下载异常:' + play_url + "\r\n" + str(eMsg))
- MysqlLiveReplay().set_task_status(live_data['id'], '3')
- def get_active_thread_set():
- # 获取爬取中的直播ID集合
- active_thread_set = set()
- for active_thread in threading.enumerate(): # 线程列表
- if active_thread.getName() != 'MainThread':
- active_thread_set.add(active_thread.getName())
- return active_thread_set
- if __name__ == "__main__":
- print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + "主方法开始执行")
- rds = RdsLiveReplaySet()
- while True:
- # 从待下载的直播集合中获取一条
- live_info_json = rds.get_live_info()
- # 若暂无待下载的直播信息,停止1s
- if live_info_json is None:
- time.sleep(1)
- continue
- try:
- live_info = json.loads(live_info_json)
- live_info['live_id'] = str(live_info['live_id'])
- thread_name = 'db_live_download_' + live_info['live_id']
- # 判断该直播信息是否在爬取中,有则跳过
- if thread_name in get_active_thread_set():
- print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + " %s 已经在执行" % thread_name)
- continue
- # 创建爬取任务
- task = threading.Thread(target=live_download, args=(live_info_json,), name=thread_name)
- task.start() # 准备就绪,等待cpu执行
- except Exception as e:
- print(e.args)
- PrintLog.print('下载直播抛出异常:'+ str(e) + '\n' + live_info_json)
- time.sleep(1)
- continue
|