2 Commits eceee65aa3 ... 361c5eb0f8

Author SHA1 Message Date
  chenzhiyuan 361c5eb0f8 更新脚本 3 years ago
  chenzhiyuan 259d7f7ad3 增加抖音达人更新 3 years ago
2 changed files with 88 additions and 0 deletions
  1. 61 0
      dy_userinfo_update.py
  2. 27 0
      rds_model/rds_user_info_list.py

+ 61 - 0
dy_userinfo_update.py

@@ -0,0 +1,61 @@
1
+import time
2
+import threading
3
+import json
4
+
5
+
6
+from rds_model.rds_user_info_list import RdsUserInfoList
7
+from log.print_log import PrintLog
8
+from web_dy import WebDouYin
9
+
10
+
11
+def scrape():
12
+    rds_list = RdsUserInfoList()
13
+    web_dy = WebDouYin()
14
+    start_time = int(time.time())
15
+
16
+    while True:
17
+        try:
18
+            if int(time.time())-start_time > 5*60:
19
+                break
20
+            sec_uid = rds_list.get_wait_update_user()
21
+
22
+            if sec_uid is None:
23
+                time.sleep(0.1)
24
+                continue
25
+
26
+            sec_uid = str(sec_uid)
27
+
28
+            PrintLog.print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + sec_uid + '开始抓取用户信息')
29
+            response_json = web_dy.get_user_info(sec_uid)
30
+            if response_json is None:
31
+                rds_list.put_user_info(sec_uid)
32
+                PrintLog.print(
33
+                    time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + '数据获取失败!响应数据为空!' + '\n'
34
+                    + sec_uid + '\n'
35
+                )
36
+
37
+            data = json.dumps({
38
+                "data": response_json.get('user'),
39
+                "extra": {
40
+                    'room_id': sec_uid
41
+                }
42
+            })
43
+
44
+            print('爬取成功')
45
+            rds_list.put_user_info(data)
46
+
47
+        except Exception as e:
48
+            print('爬取失败')
49
+            rds_list.put_wait_update_user(sec_uid)
50
+            PrintLog.print(time.strftime("%H:%M:%S", time.localtime()) + ' ' + sec_uid + '数据异常:' + str(e))
51
+        time.sleep(0.1)
52
+
53
+
54
+if __name__ == "__main__":
55
+    print("主方法开始执行")
56
+
57
+    rds = RdsUserInfoList()
58
+    print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + ' 开始执行,待更新直播队列长度:' + str(rds.get_len()))
59
+    for i in range(1, 50):
60
+        task = threading.Thread(target=scrape, name=i)
61
+        task.start()  # 准备就绪,等待cpu执行

+ 27 - 0
rds_model/rds_user_info_list.py

@@ -0,0 +1,27 @@
1
+from rds_model.db_redis import DbRedis
2
+
3
+
4
+class RdsUserInfoList:
5
+    def __init__(self):
6
+        self.redis = DbRedis.connect()
7
+
8
+    # 获取待更新用户信息的队列长度
9
+    def get_len(self):
10
+        key = 'BrandLiveData.DyWaitingUpdateUserInfoList'
11
+        return self.redis.llen(key)
12
+
13
+    # 从待更新信息的用户队列头部获取一个
14
+    def get_wait_update_user(self):
15
+        key = 'BrandLiveData.DyWaitingUpdateUserInfoList'
16
+        return self.redis.lpop(key)
17
+
18
+    # 新增待更新用户
19
+    def put_wait_update_user(self, sec_uid):
20
+        key = 'BrandLiveData.DyWaitingUpdateUserInfoList'
21
+        return self.redis.lpush(key, sec_uid)
22
+
23
+    # 已获取好的用户信息加入到列队
24
+    def put_user_info(self, data):
25
+        key = 'BrandLiveData.DyUpdateUserInfoList'
26
+        self.redis.rpush(key, data)
27
+