瀏覽代碼

直播回放上传至OSS

houxiaohua 3 年之前
父節點
當前提交
2f8c2215cf
共有 5 個文件被更改,包括 92 次插入164 次删除
  1. 2 2
      libs/ali_oss.py
  2. 0 75
      libs/mysql_kwai_show_video.py
  3. 13 0
      libs/mysql_live_replay.py
  4. 77 0
      live_replay_upload.py
  5. 0 87
      show_video_download.py

+ 2 - 2
libs/ali_oss.py

@@ -4,10 +4,10 @@ class AliOss:
4 4
 
5 5
     AccessKeyId = 'LYu38mijwUW440A4'
6 6
     AccessKeySecret = '5yQum5XXdqlP9lx8YQojwyrciB0hu4'
7
-    BucketName = 'kx-bigdata'
7
+    BucketName = 'shop-live'
8 8
     EndPoint = 'http://oss-cn-beijing.aliyuncs.com/'
9 9
 
10
-    BaseUrl = 'https://kx-bigdata.oss-cn-beijing.aliyuncs.com/live_video/'
10
+    BaseUrl = 'shop-live.oss-cn-beijing.aliyuncs.com/shop_live/'
11 11
 
12 12
     @classmethod
13 13
     def get_auth_bucket(cls):

+ 0 - 75
libs/mysql_kwai_show_video.py

@@ -1,75 +0,0 @@
1
-import time
2
-from libs.db_mysql import DbMysql
3
-
4
-
5
-class MysqlKwaiShowVideo(DbMysql):
6
-
7
-    table = 'kwai_show_video'
8
-
9
-    def set_video_status(self, id, status):
10
-        sql = 'UPDATE ' + self.table + ' SET `status`=%s WHERE `id`=%s'
11
-        try:
12
-            # 批量插入数据
13
-            result = self.cursor.execute(sql, [status, id])
14
-            return result
15
-        finally:
16
-            # 提交到数据库执行
17
-            self.db.commit()
18
-            # 最终关闭数据库连接
19
-            self.cursor.close()
20
-            self.db.close()
21
-
22
-    def get_video_by_status(self, status):
23
-        sql = 'SELECT id,live_id FROM ' + self.table + ' WHERE `status`=%s LIMIT 1'
24
-        try:
25
-            # 批量插入数据
26
-            self.cursor.execute(sql, [status])
27
-            return self.cursor.fetchone()
28
-        finally:
29
-            # 提交到数据库执行
30
-            self.db.commit()
31
-            # 最终关闭数据库连接
32
-            self.cursor.close()
33
-            self.db.close()
34
-
35
-    def get_downloaded_video(self):
36
-        last_download_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()) - 3600))
37
-        sql = 'SELECT id,live_id FROM ' + self.table + ' WHERE `status`=2 AND `download_at`<\'' + last_download_at + '\' LIMIT 1'
38
-        try:
39
-            # 查询数据
40
-            self.cursor.execute(sql)
41
-            return self.cursor.fetchone()
42
-        finally:
43
-            # 提交到数据库执行
44
-            self.db.commit()
45
-            # 最终关闭数据库连接
46
-            self.cursor.close()
47
-            self.db.close()
48
-
49
-    def set_video_tmp_url(self, id, url):
50
-        current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
51
-        sql = 'UPDATE ' + self.table + ' SET `status`=2,video_url=%s,download_at=%s WHERE `id`=%s'
52
-        try:
53
-            # 批量插入数据
54
-            result = self.cursor.execute(sql, [url, current_time, id])
55
-            return result
56
-        finally:
57
-            # 提交到数据库执行
58
-            self.db.commit()
59
-            # 最终关闭数据库连接
60
-            self.cursor.close()
61
-            self.db.close()
62
-
63
-    def set_video_url(self, id, url):
64
-        current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
65
-        sql = 'UPDATE ' + self.table + ' SET `status`=5,video_url=%s,uploaded_at=%s WHERE `id`=%s'
66
-        try:
67
-            # 批量插入数据
68
-            result = self.cursor.execute(sql, [url, current_time, id])
69
-            return result
70
-        finally:
71
-            # 提交到数据库执行
72
-            self.db.commit()
73
-            # 最终关闭数据库连接
74
-            self.cursor.close()
75
-            self.db.close()

+ 13 - 0
libs/mysql_live_replay.py

@@ -30,3 +30,16 @@ class MysqlLiveReplay(DbMysql):
30 30
             self.db.commit()  # 提交
31 31
             self.cursor.close()  # 关闭cursor
32 32
             self.db.close()  # 关闭db连接
33
+
34
+    # 获取需要上传的直播信息
35
+    def get_replay_upload_info(self):
36
+        last_download_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()) - 3600))
37
+        sql = 'SELECT id,live_id FROM ' + self.table + ' WHERE `status`=2 AND `download_at`<\'' + last_download_at + '\' LIMIT 1'
38
+        try:
39
+            # 执行SQL
40
+            self.cursor.execute(sql)
41
+            return self.cursor.fetchone()
42
+        finally:
43
+            self.db.commit()  # 提交
44
+            self.cursor.close()  # 关闭cursor
45
+            self.db.close()  # 关闭db连接

+ 77 - 0
live_replay_upload.py

@@ -0,0 +1,77 @@
1
+import time
2
+import sys
3
+import os
4
+
5
+from log.print_log import PrintLog
6
+from libs.mysql_live_replay import MysqlLiveReplay
7
+from libs.ali_oss import AliOss
8
+
9
+
10
+def upload_to_oss(live_id):
11
+    try:
12
+        file_name = live_id + '.flv'
13
+        local_path = sys.path[0] + '/file/' + file_name
14
+        oss_path = 'shop_live_replay/' + file_name
15
+
16
+        PrintLog.print('开始上传')
17
+        bucket = AliOss.get_auth_bucket()
18
+        bucket.put_object_from_file(oss_path, local_path, progress_callback=percentage)
19
+        PrintLog.print(live_id + ' 上传完成')
20
+
21
+        # 删除磁盘上的直播回放文件
22
+        try:
23
+            os.remove(local_path)
24
+            PrintLog.print(live_id + ' 删除文件成功')
25
+        except Exception as e:
26
+            PrintLog.print('删除文件抛出异常:' + str(e))
27
+    except Exception as e:
28
+        PrintLog.print('上传文件抛出异常:' + str(e))
29
+        raise e
30
+        pass
31
+
32
+
33
+# 当无法确定待上传的数据长度时,total_bytes的值为None。
34
+def percentage(consumed_bytes, total_bytes):
35
+    global start_time
36
+    current_time = time.time()
37
+    current_consumed_m_bytes = round(consumed_bytes / 1024 / 1024, 2)
38
+    rate = int(100 * (float(consumed_bytes) / float(total_bytes)))
39
+    diff_time = round(current_time - start_time)
40
+
41
+    if total_bytes:
42
+        print('\r{0}% '.format(rate), end='')
43
+        sys.stdout.flush()
44
+    if diff_time > 60:
45
+        start_time = current_time
46
+        PrintLog.print('\r已上传' + str(current_consumed_m_bytes) + 'M (' + str(rate) + '%)')
47
+
48
+
49
+def upload_replay():
50
+    PrintLog.print('开始上传直播回放')
51
+    # 获取需要上传的直播信息
52
+    replay_info = MysqlLiveReplay().get_replay_upload_info()
53
+
54
+    if replay_info is None:
55
+        time.sleep(60)
56
+        return
57
+
58
+    id, live_id = replay_info
59
+
60
+    upload_to_oss(live_id)
61
+
62
+
63
+if __name__ == "__main__":
64
+    while True:
65
+        try:
66
+            hour = int(time.strftime("%H", time.localtime()))
67
+            if hour > 17:
68
+                PrintLog.print('当前时间段不进行上传操作')
69
+                time.sleep(3600)
70
+                continue
71
+
72
+            start_time = time.time()
73
+            upload_replay()
74
+        except Exception as e:
75
+            PrintLog.print('抛出异常' + str(e))
76
+            time.sleep(1)
77
+            continue

+ 0 - 87
show_video_download.py

@@ -1,87 +0,0 @@
1
-#!/usr/bin/python3
2
-# coding=utf-8
3
-# -*- coding: utf-8 -*-
4
-
5
-import requests
6
-import time
7
-import json
8
-import threading
9
-from libs.rds_wait_upload_show_video_set import RdsWaitUploadShowVideoSet
10
-from libs.mysql_kwai_show_video import MysqlKwaiShowVideo
11
-from log.print_log import PrintLog
12
-
13
-
14
-def download_video(live_video_params):
15
-    params = json.loads(live_video_params)
16
-
17
-    for play_url in params['play_urls']:
18
-        try:
19
-            MysqlKwaiShowVideo().set_video_status(params['id'], '1')
20
-
21
-            # requests.get返回的是一个可迭代对象(Iterable),此时Python SDK会通过Chunked Encoding方式上传。
22
-            input = requests.get(play_url, stream=True)
23
-
24
-            PrintLog.print(params['live_id'] + ' 开始下载:' + play_url)
25
-
26
-            f = open('file/' + params['live_id'] + ".flv", "ab")  # 有则追加,没有则创建
27
-
28
-            # 将视频数据分片写入文件
29
-            for chunk in input.iter_content(chunk_size=512):
30
-                if chunk:
31
-                    f.write(chunk)
32
-
33
-            f.close()
34
-
35
-            PrintLog.print(params['live_id'] + ' 下载完成:' + play_url)
36
-
37
-            tmp_url = 'https://ks.wenxingshuju.com/showvideo/' + params['live_id'] + '.flv'
38
-            res = MysqlKwaiShowVideo().set_video_tmp_url(params['id'], tmp_url)
39
-            return
40
-        except Exception as e:
41
-            PrintLog.print(params['live_id'] + '下载异常:' + play_url + "\r\n" + str(e))
42
-            MysqlKwaiShowVideo().set_video_status(params['id'], '3')
43
-            continue
44
-
45
-
46
-def get_active_thread_set():
47
-    # 待爬取的直播ID集合
48
-    active_thread_set = set()
49
-    for active_thread in threading.enumerate():  # 线程列表
50
-        if active_thread.getName() != 'MainThread':
51
-            active_thread_set.add(active_thread.getName())
52
-
53
-    return active_thread_set
54
-
55
-
56
-if __name__ == "__main__":
57
-
58
-    print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + "主方法开始执行")
59
-
60
-    rds_set = RdsWaitUploadShowVideoSet()
61
-
62
-    while True:
63
-
64
-        # 从 待下载直播视频集合中 列表中获取一条
65
-        live_video_params = rds_set.pop_params()
66
-
67
-        # 如果没有,则等一秒,循环
68
-        if live_video_params is None:
69
-            time.sleep(1)
70
-            continue
71
-
72
-        try:
73
-            params = json.loads(live_video_params)
74
-
75
-            thread_name = 'show_download_' + params['live_id']
76
-            # 判断得到的直播间ID在爬取中
77
-            # 判断线程中是否已经有该任务,有则跳过
78
-            if thread_name in get_active_thread_set():
79
-                print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + " %s 已经在执行" % thread_name)
80
-                continue
81
-
82
-            task = threading.Thread(target=download_video, args=(live_video_params,), name=thread_name)
83
-            task.start()  # 准备就绪,等待cpu执行
84
-        except Exception as e:
85
-            PrintLog.print('抛出异常:' + str(e) + '\n' + live_video_params)
86
-            time.sleep(1)
87
-            continue