店播爬取Python脚本

scraper.py 16KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. #!/usr/bin/python3
  2. # coding=utf-8
  3. # -*- coding: utf-8 -*-
  4. import random
  5. import websocket
  6. import redis
  7. import requests
  8. import threading
  9. import sys
  10. import json
  11. from barrage import MessageDecode
  12. from libs.proxy import Proxy
  13. try:
  14. import thread
  15. except ImportError:
  16. import _thread as thread
  17. import time
  18. def get_rand_str(len):
  19. charset = "bjectSymhasOwnProp0123456789ABCDEFGHIJKLMNQRTUVWXYZdfgiklquvxz"
  20. str = ''
  21. for _ in range(0, len):
  22. str += random.choice(charset)
  23. return str
  24. def get_page_id():
  25. page_id = get_rand_str(16)
  26. page_id += "_"
  27. page_id += str(int(time.time() * 1000))
  28. return page_id
  29. def get_websocket_info(live_stream_id, retry=0):
  30. param_json = {
  31. "operationName": "WebSocketInfoQuery",
  32. "variables": {
  33. "liveStreamId": live_stream_id
  34. },
  35. "query": "query WebSocketInfoQuery($liveStreamId: String) {\n webSocketInfo(liveStreamId: $liveStreamId) {\n token\n webSocketUrls\n __typename\n }\n}\n"
  36. }
  37. user_agents = {
  38. 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3908.2 Safari/537.36',
  39. 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36',
  40. 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/82.0.4055.0 Safari/537.36',
  41. 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36',
  42. }
  43. cookie_list = [
  44. 'web_8f5a71d99d502219d7ce1ca1ffec68cc; clientid=3; client_key=65890b29; Hm_lvt_86a27b7db2c5c0ae37fee4a8a35033ee=1595212029,1597053525; userId=1089887853; kuaishou.live.bfb1s=477cb0011daca84b36b3a4676857e5a1; didv=1603703368000; kpn=GAME_ZONE; kuaishou.live.web_st=ChRrdWFpc2hvdS5saXZlLndlYi5zdBKgATKQJmTG8mk6lH2pOJeVa-184Sq95qCmEB1lP_q3lItdj6a6gnYgCIcPkt1ZvnJlE3cAkBVIiK46xUEyJ1SZe2hatA7MGwchvIhNlvumZQWfwUHwDOKWRkpNOn-4c0wYnHmgituvkv2B2XJC1lnaLBJ57zZU6iKrtgtXyekfNrf37VW3n0cpGF8LfjxvfQibwYfxewf2uOhDQzKnCA-kLosaEhqtgBlt9k0TlHg5Y4Goo9D9kyIgPsuUj-GlXMBsywOViBgpl-tt7OVb051_AZAHST9ItzAoBTAB; kuaishou.live.web_ph=8f83f1ed0c54c288352d86b3a23fe996499e; userId=1089887853',
  45. 'kuaishou.live.bfb1s=7206d814e5c089a58c910ed8bf52ace5; clientid=3; client_key=65890b29; sid=83cf0fb9ee15fde6b6987781; did=web_c280cf49619a100d8305b6f095ed5348; userId=1509370340; didv=1600053596890; WEBLOGGER_HTTP_SEQ_ID=18928; WEBLOGGER_INCREAMENT_ID_KEY=19033; Hm_lvt_86a27b7db2c5c0ae37fee4a8a35033ee=1601391911; Hm_lpvt_86a27b7db2c5c0ae37fee4a8a35033ee=1602073621; kpn=GAME_ZONE; kuaishou.live.web_st=ChRrdWFpc2hvdS5saXZlLndlYi5zdBKgAcSZ7bO7s4CskEWV8oXNhqBeAW7qebb6Gv8_pmwLpeDLgzlIl3Nwgd2qal6895okBZxhkFFXQLIRvSJss1ALy5H2k0B9JvRZc9RNm4FzKmZ4eWPx2B3f-RLyiz6CkNpEsjcgpCQ90pQyBJgc9nWyo3vJRJS9fl53t2hFW29cvcHJktjSh4VRl-JrZ6WI3r3Et1kGxyh0wGqW3Q3xWzVMmIkaEsvpGUru20c-iIt7T0W8MQrXwiIgMbhYaFs2XBsAXpVKQoE6xmrFlpUw3VfiZaze7OM6mkEoBTAB; kuaishou.live.web_ph=6248010c3bbf0923d41c9d3c231b2903007e; userId=1509370340',
  46. 'did=web_03d225bf290f905c1d1a5b3810af3b07; didv=1607676125725; kuaishou.live.bfb1s=9b8f70844293bed778aade6e0a8f9942; clientid=3; client_key=65890b29; kpn=GAME_ZONE; Hm_lvt_86a27b7db2c5c0ae37fee4a8a35033ee=1607676148; Hm_lpvt_86a27b7db2c5c0ae37fee4a8a35033ee=1607676148; userId=2020851346; userId=2020851346; kuaishou.live.web_st=ChRrdWFpc2hvdS5saXZlLndlYi5zdBKgAR00HvTWTmSGTYkxB2Mny84aCe3Lu8qGgMyP7Bhp7Y8lHqkC4TPLNt8H71ZesLiX4hpu9eTzOCiCmzXp7bCHQxzYt6_POWrTVT9L-jgFL4ET3JmTqrjxudwbSZQP-icUJACHgXEpf9a_BXvPS1RrBnl82Gyg1LwLXxIo0uIdwhheHrAp3u4h4C6S5lqWvZ7aNX1nDk_5Jsyq8UqsuCXLNiMaEgCrAu8bFEUPixNgRvVq1Nb0ZSIgKyTMpEkPIRE47p3L_U0hfzZ2IIBI9YfiD5jI34L2zmAoBTAB; kuaishou.live.web_ph=2f965caadcde0572313c46a7f8de50843381'
  47. ]
  48. res = redis_connection.zrangebyscore(scrape_cookie_key, 0, '+inf', start=0, num=1, withscores=True)
  49. if len(res) == 0:
  50. cookie = 'kuaishou.live.bfb1s=9b8f70844293bed778aade6e0a8f9942; clientid=3; did=web_2ba8ea2ca07df85a9e193a75d443d128; client_key=65890b29; kpn=GAME_ZONE; userId=1509370340; userId=1509370340; kuaishou.live.web_st=ChRrdWFpc2hvdS5saXZlLndlYi5zdBKgAZIqgEWU0YHOwebl3XEBuu7odxIyvSspUq1v_kvLGCVI_eURJXwii1_AN0CwuxScDBKoK2y2HIghAZLxMVzJhR0wQ4IHJ0zi3TtLsnrbNlA9VnnSZpuHK2b1M2RxEiFEKBRTK0AwnRzN7UOvkKQTfOIxtALOFXqSPOzJ4R8ScI30o_CqmgWPbg7dw0Kt5sgpGFYcZMCA1vgkFhiph-O4cE8aEjGueioax06vmORaF3eBQr3cQSIgsBvfCInpIGIqrTgvvo3648essPww4pkcAKNHOgeNHjUoBTAB; kuaishou.live.web_ph=d77227e666b43027931968af443b14bd7b28'
  51. else:
  52. cookie, times = res[0]
  53. print('获取cookie ' + str(int(times)) + ' ' + cookie[0:30])
  54. headers = {
  55. 'user-agent': user_agents.pop(),
  56. 'Cookie': cookie,
  57. 'Referer': 'https://live.kuaishou.com/u/3xse' + live_stream_id
  58. }
  59. proxy = Proxy.get()
  60. print('获取代理 ' + proxy)
  61. proxies = {
  62. "http": "http://" + proxy,
  63. "https": "http://" + proxy
  64. }
  65. try:
  66. # https://live.kuaishou.com/live_graphql
  67. r = requests.post(
  68. "https://live.kuaishou.com/m_graphql",
  69. json=param_json,
  70. headers=headers,
  71. proxies=proxies,
  72. timeout=30
  73. )
  74. webSocketInfo = r.json()['data']['webSocketInfo']
  75. token = webSocketInfo.get('token')
  76. if token is None:
  77. redis_connection.zincrby(scrape_cookie_key, 1, cookie)
  78. # Proxy.del_proxy(proxy)
  79. print('获取Token失败')
  80. # 重试10次还失败
  81. if retry > 10:
  82. return None, None
  83. time.sleep(2)
  84. retry = retry + 1
  85. return get_websocket_info(live_stream_id, retry)
  86. url = webSocketInfo['webSocketUrls'][0]
  87. return url, token
  88. except Exception as e:
  89. print('获取Token抛出异常:' + str(e))
  90. return get_websocket_info(live_stream_id, retry)
  91. def on_message(ws, message):
  92. global redis_connection
  93. data = [m for m in message]
  94. messageDecode = MessageDecode(data)
  95. if messageDecode.decode():
  96. if messageDecode.message['payloadType'] == 310: # 弹幕
  97. messageDecode.feed_decode()
  98. del messageDecode.message['compressionType'], messageDecode.message['payload']
  99. # if messageDecode.message.get('displayWatchingCount'):
  100. # redis_connection.hset(stream_barrage_stat_hash_key, 'displayWatchingCount:' + ws.live_stream_id,
  101. # messageDecode.message['displayWatchingCount'])
  102. # print("观看人数:" + message.message['displayWatchingCount'])
  103. # if messageDecode.message.get('displayLikeCount'):
  104. # redis_connection.hset(stream_barrage_stat_hash_key, 'displayLikeCount:' + ws.live_stream_id,
  105. # messageDecode.message['displayLikeCount'])
  106. # print("点赞人数:" + message.message['displayLikeCount'])
  107. messageDecode.message['live_stream_id'] = ws.live_stream_id
  108. redis_connection.lpush(stream_barrage_data_key, json.dumps(messageDecode.message))
  109. print(ws.live_stream_id + '_捕获弹幕信息')
  110. #
  111. # persistent_mysql = PersistentMysql()
  112. # persistent_mysql.live_stream_id = ws.live_stream_id
  113. # persistent_mysql.created_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  114. # persistent_mysql.batch_insert(messageDecode.message)
  115. # print(ws.live_stream_id + "主动断开连接")
  116. # ws.close()
  117. # sys.stdout.flush()
  118. return True
  119. # elif messageDecode.message['payloadType'] == 381: # PK
  120. # messageDecode.pk_stat_decode()
  121. # messageDecode.message['live_stream_id'] = ws.live_stream_id
  122. # redis_connection.lpush('OnliveAdmin:barrageScraperPkStatData', json.dumps(messageDecode.message))
  123. # print(messageDecode.message['pkPlayerStatistic'])
  124. # sys.stdout.flush()
  125. # return True
  126. # 如果有退出命令,则退出该线程
  127. if redis_connection.sismember(waiting_exit_stream_key, ws.live_stream_id):
  128. redis_connection.srem(waiting_exit_stream_key, ws.live_stream_id)
  129. print(ws.live_stream_id + "手动停止爬取")
  130. ws.close()
  131. def on_error(ws, error):
  132. redis_connection.zrem(scraping_stream_key, ws.live_stream_id)
  133. print(ws.live_stream_id + error)
  134. sys.stdout.flush()
  135. def on_close(ws):
  136. redis_connection.zrem(scraping_stream_key, ws.live_stream_id)
  137. redis_connection.lpush(live_ended_key, ws.live_stream_id)
  138. print(ws.live_stream_id + " 直播已结束 " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
  139. sys.stdout.flush()
  140. def on_open(ws):
  141. part1 = [0x08, 0xC8, 0x01, 0x1A, 0xDC, 0x01, 0x0A, 0xAC, 0x01] # 不变的头
  142. part2 = [ord(c) for c in ws.token]
  143. part3 = [0x12, 0x0B] #
  144. part4 = [ord(c) for c in ws.live_stream_id]
  145. part5 = [0x3A, 0x1E]
  146. page_id = get_page_id()
  147. part6 = [ord(c) for c in page_id]
  148. d = part1 + part2 + part3 + part4 + part5 + part6
  149. ws.send(d, websocket.ABNF.OPCODE_BINARY)
  150. def run():
  151. while True:
  152. time.sleep(20)
  153. if ws.live_stream_id not in get_active_thread_set():
  154. print('停止心跳:' + ws.live_stream_id)
  155. sys.exit(0)
  156. print('心跳中:' + ws.live_stream_id)
  157. # 发送心跳-当前时间戳-毫秒
  158. head = [0x08, 0x01, 0x1A, 0x07, 0x08]
  159. timestamp = int(time.time() * 1000)
  160. time_arr = MessageDecode.hex_(timestamp)
  161. heartbeat = head + time_arr
  162. ws.send(heartbeat, websocket.ABNF.OPCODE_BINARY)
  163. thread.start_new_thread(run, ())
  164. def scrape(live_stream_id):
  165. print("%s 开始执行" % live_stream_id)
  166. # 记录当前任务正在运行
  167. redis_connection.zadd(scraping_stream_key, {live_stream_id: int(time.time())})
  168. websocket.enableTrace(False)
  169. tokenInfo = redis_connection.hget(scrape_live_token_hash_key, live_stream_id)
  170. if (tokenInfo is None) or (tokenInfo == ''):
  171. url, token = get_websocket_info(live_stream_id)
  172. if url is None:
  173. return False
  174. redis_connection.hset(scrape_live_token_hash_key, live_stream_id, json.dumps({
  175. 'url': url,
  176. 'token': token,
  177. }))
  178. print(live_stream_id + " 获取Url成功:\n" + url)
  179. print(token)
  180. else:
  181. tokenInfoObj = json.loads(tokenInfo)
  182. url = tokenInfoObj.get('url')
  183. token = tokenInfoObj.get('token')
  184. print(live_stream_id + " 缓存获取Url成功:\n" + url)
  185. print(token)
  186. sys.stdout.flush()
  187. try:
  188. ws = websocket.WebSocketApp(
  189. url,
  190. on_message=on_message,
  191. on_error=on_error,
  192. on_close=on_close
  193. )
  194. ws.on_open = on_open
  195. ws.live_stream_id = live_stream_id
  196. ws.token = token
  197. ws.run_forever(skip_utf8_validation=True)
  198. except Exception as e:
  199. print(e)
  200. ws.close()
  201. # 如果爬取的线程过多,则暂时等待
  202. def check_threading_count():
  203. can_scrape_max_threading = redis_connection.get(can_scrape_max_threading_key)
  204. if can_scrape_max_threading is None:
  205. can_scrape_max_threading = 0
  206. # 如果爬取的线程过多,则暂时等待
  207. if threading.active_count() > int(can_scrape_max_threading):
  208. print('### 爬取直播弹幕任务过多,请等待...... ###')
  209. # 将标志位设为0, 暂时禁止添加爬取任务
  210. redis_connection.set(can_scrape_sign_key, 0)
  211. time.sleep(0.6)
  212. return False
  213. # 将标志位设为1, 允许添加爬取任务
  214. redis_connection.set(can_scrape_sign_key, 1)
  215. return True
  216. # 清理脏数据
  217. def clear_scraping_stream():
  218. # 获取Redis中记录的运行中线程列表
  219. active_list = redis_connection.zrange(scraping_stream_key, 0, -1)
  220. for live_stream_id in active_list:
  221. if live_stream_id not in get_active_thread_set():
  222. redis_connection.zrem(scraping_stream_key, live_stream_id)
  223. # 获取Redis中记录的待停止线程列表
  224. waiting_exit_stream_set = redis_connection.smembers(waiting_exit_stream_key)
  225. for waiting_exit_stream_id in waiting_exit_stream_set:
  226. if waiting_exit_stream_id not in active_list:
  227. redis_connection.srem(waiting_exit_stream_key, waiting_exit_stream_id)
  228. # 清理Redis中记录的运行中线程列表
  229. def clear_up_scraping_stream():
  230. # 获取Redis中记录的运行中线程列表
  231. # 如果运行中的线程列表中有数据,则先爬取
  232. active_list = redis_connection.zrange(scraping_stream_key, 0, -1)
  233. for live_stream_id in active_list:
  234. redis_connection.zrem(scraping_stream_key, live_stream_id)
  235. redis_connection.rpush(waiting_scrape_stream_key, live_stream_id)
  236. def get_active_thread_set():
  237. # 待爬取的直播ID集合
  238. active_thread_set = set()
  239. for active_thread in threading.enumerate(): # 线程列表
  240. if active_thread.getName() != 'MainThread':
  241. active_thread_set.add(active_thread.getName())
  242. return active_thread_set
  243. if __name__ == "__main__":
  244. print("主方法开始执行")
  245. # redis_connection = redis.StrictRedis(host='127.0.0.1', port=6379, decode_responses=True, db=1)
  246. redis_connection = redis.Redis(
  247. host='r-2ze28bdb7389a8a4.redis.rds.aliyuncs.com',
  248. port=6379,
  249. password='Zhuaduoduo2017',
  250. decode_responses=True
  251. )
  252. # 待爬取直播间ID List
  253. # waiting_scrape_stream_key = 'barrageScraperWaitingStreamIdList'
  254. waiting_scrape_stream_key = 'BrandLiveData.BarrageScraperWaitingStreamIdList'
  255. # 爬取中直播间ID Sort Set
  256. # scraping_stream_key = 'barrageScraperScrapingStreamIdSortSet'
  257. scraping_stream_key = 'BrandLiveData.BarrageScraperScrapingStreamIdSortSet'
  258. # cookie
  259. scrape_cookie_key = 'barrageScraperCookieSortSet'
  260. # token
  261. scrape_live_token_hash_key = 'BrandLiveData.BarrageScraperLiveTokenHash'
  262. # 待关闭直播间ID Set
  263. waiting_exit_stream_key = 'BrandLiveData.WaitingExitStreamIdSet'
  264. # 最大爬取线程数 string
  265. can_scrape_max_threading_key = 'BrandLiveData.BarrageScrapeMaxThreadingKey'
  266. # 是否可爬取标志位(当爬取线程数已经达到最大时置为0)
  267. can_scrape_sign_key = 'BrandLiveData.BarrageScrapeCanScrapeSignKey'
  268. # 记录获取的直播间观看人数和点赞数
  269. stream_barrage_stat_hash_key = 'BrandLiveData.BarrageScrapeLiveStreamStatHash'
  270. # 记录弹幕接口中获取到的数据
  271. stream_barrage_data_key = 'BrandLiveData.BarrageScraperBarrageData'
  272. # 记录直播结束的直播间信息
  273. live_ended_key = 'BrandLiveData.LiveEndedKey'
  274. # 先清理Redis中记录的运行中线程列表,一般是由于异常退出造成的
  275. clear_up_scraping_stream()
  276. while True:
  277. # 判断爬取线程是否超过设定的最大值
  278. if check_threading_count() is False:
  279. continue
  280. # 从 待爬取直播间ID 列表中获取一个 直播间ID
  281. stream_id = redis_connection.rpop(waiting_scrape_stream_key)
  282. # 如果没有待爬取的直播,则等一秒,循环
  283. if stream_id is None:
  284. # print("### 等待爬取直播弹幕...... ###")
  285. time.sleep(0.6)
  286. continue
  287. # 判断得到的直播间ID在爬取中
  288. # 判断线程中是否已经有该任务,有则跳过
  289. if stream_id in get_active_thread_set():
  290. print("%s 已经在执行" % stream_id)
  291. continue
  292. task = threading.Thread(target=scrape, args=(stream_id,), name=stream_id, daemon=False)
  293. task.start() # 准备就绪,等待cpu执行
  294. # 清理脏数据
  295. clear_scraping_stream()
  296. time.sleep(1)