123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- import time
- import sys
- import os
- from log.print_log import PrintLog
- from libs.mysql_live_replay import MysqlLiveReplay
- from libs.ali_oss import AliOss, oss2
- def upload_to_oss(live_id, id):
- try:
- file_name = live_id + '.flv'
- local_path = sys.path[0] + '/file/' + file_name
- oss_path = 'shop_live_replay/' + file_name
- PrintLog.print('开始上传')
- bucket = AliOss.get_auth_bucket()
- # bucket.put_object_from_file(oss_path, local_path, progress_callback=percentage)
- # 获取文件大小
- total_size = os.path.getsize(local_path)
- print(total_size)
- part_size = oss2.determine_part_size(total_size, preferred_size=1000 * 1024 * 1000)
- # 初始化分片上传,得到Upload ID。接下来的接口都要用到这个Upload ID。
- upload_id = bucket.init_multipart_upload(oss_path).upload_id
- # 逐个上传分片
- # 其中oss2.SizedFileAdapter()把file_obj转换为一个新的文件对象,新的文件对象可读的长度等于size_to_upload
- with open(local_path, 'rb') as file_obj:
- parts = []
- part_number = 1
- offset = 0
- while offset < total_size:
- size_to_upload = min(part_size, total_size - offset)
- result = bucket.upload_part(oss_path, upload_id, part_number,
- oss2.SizedFileAdapter(file_obj, size_to_upload))
- parts.append(oss2.models.PartInfo(part_number, result.etag, size=size_to_upload, part_crc=result.crc))
- offset += size_to_upload
- part_number += 1
- # 完成分片上传
- bucket.complete_multipart_upload(oss_path, upload_id, parts)
- # 验证一下
- with open(local_path, 'rb') as file_obj:
- assert bucket.get_object(oss_path).read() == file_obj.read()
- PrintLog.print(live_id + ' 上传完成')
- MysqlLiveReplay().set_task_status(id, '7')
- # 删除磁盘上的直播回放文件
- os.remove(local_path)
- PrintLog.print(live_id + ' 删除文件成功')
-
- except Exception as e:
- PrintLog.print('上传文件抛出异常:' + str(e))
- MysqlLiveReplay().set_task_status(id, '8')
- raise e
- pass
- # 当无法确定待上传的数据长度时,total_bytes的值为None。
- def percentage(consumed_bytes, total_bytes):
- global start_time
- current_time = time.time()
- current_consumed_m_bytes = round(consumed_bytes / 1024 / 1024, 2)
- rate = int(100 * (float(consumed_bytes) / float(total_bytes)))
- diff_time = round(current_time - start_time)
- if total_bytes:
- print('\r{0}% '.format(rate), end='')
- sys.stdout.flush()
- if diff_time > 60:
- start_time = current_time
- PrintLog.print('\r已上传' + str(current_consumed_m_bytes) + 'M (' + str(rate) + '%)')
- def upload_replay():
- PrintLog.print('开始上传直播回放')
- # 获取需要上传的直播信息
- replay_info = MysqlLiveReplay().get_replay_upload_info()
- if replay_info is None:
- time.sleep(60)
- return
- id, live_id = replay_info
- MysqlLiveReplay().set_task_status(id, '6')
- upload_to_oss(live_id, id)
- if __name__ == "__main__":
- while True:
- try:
- # hour = int(time.strftime("%H", time.localtime()))
- # if hour > 17:
- # PrintLog.print('当前时间段不进行上传操作')
- # time.sleep(3600)
- # continue
- start_time = time.time()
- upload_replay()
- except Exception as e:
- PrintLog.print('抛出异常' + str(e))
- time.sleep(1)
- continue
|