import time import sys import os from log.print_log import PrintLog from libs.mysql_live_replay import MysqlLiveReplay from libs.ali_oss import AliOss, oss2 def upload_to_oss(live_id, id): try: file_name = live_id + '.flv' local_path = sys.path[0] + '/file/' + file_name oss_path = 'shop_live_replay/' + file_name PrintLog.print('开始上传') bucket = AliOss.get_auth_bucket() # bucket.put_object_from_file(oss_path, local_path, progress_callback=percentage) # 获取文件大小 total_size = os.path.getsize(local_path) print(total_size) part_size = oss2.determine_part_size(total_size, preferred_size=1000 * 1024 * 1000) # 初始化分片上传,得到Upload ID。接下来的接口都要用到这个Upload ID。 upload_id = bucket.init_multipart_upload(oss_path).upload_id # 逐个上传分片 # 其中oss2.SizedFileAdapter()把file_obj转换为一个新的文件对象,新的文件对象可读的长度等于size_to_upload with open(local_path, 'rb') as file_obj: parts = [] part_number = 1 offset = 0 while offset < total_size: size_to_upload = min(part_size, total_size - offset) result = bucket.upload_part(oss_path, upload_id, part_number, oss2.SizedFileAdapter(file_obj, size_to_upload)) parts.append(oss2.models.PartInfo(part_number, result.etag, size=size_to_upload, part_crc=result.crc)) offset += size_to_upload part_number += 1 # 完成分片上传 bucket.complete_multipart_upload(oss_path, upload_id, parts) # 验证一下 with open(local_path, 'rb') as file_obj: assert bucket.get_object(oss_path).read() == file_obj.read() PrintLog.print(live_id + ' 上传完成') MysqlLiveReplay().set_task_status(id, '7') # 删除磁盘上的直播回放文件 os.remove(local_path) PrintLog.print(live_id + ' 删除文件成功') except Exception as e: PrintLog.print('上传文件抛出异常:' + str(e)) MysqlLiveReplay().set_task_status(id, '8') raise e pass # 当无法确定待上传的数据长度时,total_bytes的值为None。 def percentage(consumed_bytes, total_bytes): global start_time current_time = time.time() current_consumed_m_bytes = round(consumed_bytes / 1024 / 1024, 2) rate = int(100 * (float(consumed_bytes) / float(total_bytes))) diff_time = round(current_time - start_time) if total_bytes: print('\r{0}% '.format(rate), end='') sys.stdout.flush() if diff_time > 60: start_time = current_time PrintLog.print('\r已上传' + str(current_consumed_m_bytes) + 'M (' + str(rate) + '%)') def upload_replay(): PrintLog.print('开始上传直播回放') # 获取需要上传的直播信息 replay_info = MysqlLiveReplay().get_replay_upload_info() if replay_info is None: time.sleep(60) return id, live_id = replay_info MysqlLiveReplay().set_task_status(id, '6') upload_to_oss(live_id, id) if __name__ == "__main__": while True: try: # hour = int(time.strftime("%H", time.localtime())) # if hour > 17: # PrintLog.print('当前时间段不进行上传操作') # time.sleep(3600) # continue start_time = time.time() upload_replay() except Exception as e: PrintLog.print('抛出异常' + str(e)) time.sleep(1) continue