店播爬取Python脚本

live_replay_upload.py 3.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import time
  2. import sys
  3. import os
  4. from log.print_log import PrintLog
  5. from libs.mysql_live_replay import MysqlLiveReplay
  6. from libs.ali_oss import AliOss, oss2
  7. def upload_to_oss(live_id, id):
  8. try:
  9. file_name = live_id + '.flv'
  10. local_path = sys.path[0] + '/file/' + file_name
  11. oss_path = 'shop_live_replay/' + file_name
  12. PrintLog.print('开始上传')
  13. bucket = AliOss.get_auth_bucket()
  14. # bucket.put_object_from_file(oss_path, local_path, progress_callback=percentage)
  15. # 获取文件大小
  16. total_size = os.path.getsize(local_path)
  17. print(total_size)
  18. part_size = oss2.determine_part_size(total_size, preferred_size=1000 * 1024 * 1000)
  19. # 初始化分片上传,得到Upload ID。接下来的接口都要用到这个Upload ID。
  20. upload_id = bucket.init_multipart_upload(oss_path).upload_id
  21. # 逐个上传分片
  22. # 其中oss2.SizedFileAdapter()把file_obj转换为一个新的文件对象,新的文件对象可读的长度等于size_to_upload
  23. with open(local_path, 'rb') as file_obj:
  24. parts = []
  25. part_number = 1
  26. offset = 0
  27. while offset < total_size:
  28. size_to_upload = min(part_size, total_size - offset)
  29. result = bucket.upload_part(oss_path, upload_id, part_number,
  30. oss2.SizedFileAdapter(file_obj, size_to_upload))
  31. parts.append(oss2.models.PartInfo(part_number, result.etag, size=size_to_upload, part_crc=result.crc))
  32. offset += size_to_upload
  33. part_number += 1
  34. # 完成分片上传
  35. bucket.complete_multipart_upload(oss_path, upload_id, parts)
  36. # 验证一下
  37. with open(local_path, 'rb') as file_obj:
  38. assert bucket.get_object(oss_path).read() == file_obj.read()
  39. PrintLog.print(live_id + ' 上传完成')
  40. MysqlLiveReplay().set_task_status(id, '7')
  41. # 删除磁盘上的直播回放文件
  42. os.remove(local_path)
  43. PrintLog.print(live_id + ' 删除文件成功')
  44. except Exception as e:
  45. PrintLog.print('上传文件抛出异常:' + str(e))
  46. MysqlLiveReplay().set_task_status(id, '8')
  47. raise e
  48. pass
  49. # 当无法确定待上传的数据长度时,total_bytes的值为None。
  50. def percentage(consumed_bytes, total_bytes):
  51. global start_time
  52. current_time = time.time()
  53. current_consumed_m_bytes = round(consumed_bytes / 1024 / 1024, 2)
  54. rate = int(100 * (float(consumed_bytes) / float(total_bytes)))
  55. diff_time = round(current_time - start_time)
  56. if total_bytes:
  57. print('\r{0}% '.format(rate), end='')
  58. sys.stdout.flush()
  59. if diff_time > 60:
  60. start_time = current_time
  61. PrintLog.print('\r已上传' + str(current_consumed_m_bytes) + 'M (' + str(rate) + '%)')
  62. def upload_replay():
  63. PrintLog.print('开始上传直播回放')
  64. # 获取需要上传的直播信息
  65. replay_info = MysqlLiveReplay().get_replay_upload_info()
  66. if replay_info is None:
  67. time.sleep(60)
  68. return
  69. id, live_id = replay_info
  70. MysqlLiveReplay().set_task_status(id, '6')
  71. upload_to_oss(live_id, id)
  72. if __name__ == "__main__":
  73. while True:
  74. try:
  75. # hour = int(time.strftime("%H", time.localtime()))
  76. # if hour > 17:
  77. # PrintLog.print('当前时间段不进行上传操作')
  78. # time.sleep(3600)
  79. # continue
  80. start_time = time.time()
  81. upload_replay()
  82. except Exception as e:
  83. PrintLog.print('抛出异常' + str(e))
  84. time.sleep(1)
  85. continue