import time import json import threading import requests import sys from rds_model.rds_live_replay_set import RdsLiveReplaySet from libs.mysql_live_replay import MysqlLiveReplay from log.print_log import PrintLog def live_download(live_data_json): live_data = json.loads(live_info_json) play_url = live_data['play_urls'] live_data['live_id'] = str(live_data['live_id']) # for play_url in live_data['play_urls']: try: # 判断是否为重试任务 is_retry = MysqlLiveReplay().is_retry_task(live_data['id']) if is_retry is None: MysqlLiveReplay().set_task_begin(live_data['id'], '1') else: MysqlLiveReplay().set_task_status(live_data['id'], '1') # requests.get返回的是一个可迭代对象(Iterable),此时Python SDK会通过Chunked Encoding方式上传。 input_stream = requests.get(play_url, stream=True) PrintLog.print(live_data['live_id'] + ' 开始下载:' + play_url) f = open(sys.path[0] + '/file/' + live_data['live_id'] + ".flv", "ab+") # 有则追加,没有则创建 # 将视频数据分片写入文件 for chunk in input_stream.iter_content(chunk_size=512): if chunk: f.write(chunk) f.close() PrintLog.print(live_data['live_id'] + ' 下载完成:' + play_url) tmp_url = 'http://39.107.156.19:8006/' + live_data['live_id'] + '.flv' res = MysqlLiveReplay().set_video_url(live_data['id'], tmp_url) return except Exception as eMsg: PrintLog.print(live_data['live_id'] + '下载异常:' + play_url + "\r\n" + str(eMsg)) MysqlLiveReplay().set_task_status(live_data['id'], '3') def get_active_thread_set(): # 获取爬取中的直播ID集合 active_thread_set = set() for active_thread in threading.enumerate(): # 线程列表 if active_thread.getName() != 'MainThread': active_thread_set.add(active_thread.getName()) return active_thread_set if __name__ == "__main__": print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + "主方法开始执行") rds = RdsLiveReplaySet() while True: # 从待下载的直播集合中获取一条 live_info_json = rds.get_live_info() # 若暂无待下载的直播信息,停止1s if live_info_json is None: time.sleep(1) continue try: live_info = json.loads(live_info_json) live_info['live_id'] = str(live_info['live_id']) thread_name = 'db_live_download_' + live_info['live_id'] # 判断该直播信息是否在爬取中,有则跳过 if thread_name in get_active_thread_set(): print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + " %s 已经在执行" % thread_name) continue # 创建爬取任务 task = threading.Thread(target=live_download, args=(live_info_json,), name=thread_name) task.start() # 准备就绪,等待cpu执行 except Exception as e: print(e.args) PrintLog.print('下载直播抛出异常:'+ str(e) + '\n' + live_info_json) time.sleep(1) continue