#!/usr/bin/python3 # coding=utf-8 # -*- coding: utf-8 -*- import random import websocket import redis import requests import threading import sys import json from barrage import MessageDecode from libs.proxy import Proxy try: import thread except ImportError: import _thread as thread import time def get_rand_str(len): charset = "bjectSymhasOwnProp0123456789ABCDEFGHIJKLMNQRTUVWXYZdfgiklquvxz" str = '' for _ in range(0, len): str += random.choice(charset) return str def get_page_id(): page_id = get_rand_str(16) page_id += "_" page_id += str(int(time.time() * 1000)) return page_id def get_websocket_info(live_stream_id, retry=0): param_json = { "operationName": "WebSocketInfoQuery", "variables": { "liveStreamId": live_stream_id }, "query": "query WebSocketInfoQuery($liveStreamId: String) {\n webSocketInfo(liveStreamId: $liveStreamId) {\n token\n webSocketUrls\n __typename\n }\n}\n" } user_agents = { 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3908.2 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/82.0.4055.0 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36', } cookie_list = [ 'web_8f5a71d99d502219d7ce1ca1ffec68cc; clientid=3; client_key=65890b29; Hm_lvt_86a27b7db2c5c0ae37fee4a8a35033ee=1595212029,1597053525; userId=1089887853; kuaishou.live.bfb1s=477cb0011daca84b36b3a4676857e5a1; didv=1603703368000; kpn=GAME_ZONE; kuaishou.live.web_st=ChRrdWFpc2hvdS5saXZlLndlYi5zdBKgATKQJmTG8mk6lH2pOJeVa-184Sq95qCmEB1lP_q3lItdj6a6gnYgCIcPkt1ZvnJlE3cAkBVIiK46xUEyJ1SZe2hatA7MGwchvIhNlvumZQWfwUHwDOKWRkpNOn-4c0wYnHmgituvkv2B2XJC1lnaLBJ57zZU6iKrtgtXyekfNrf37VW3n0cpGF8LfjxvfQibwYfxewf2uOhDQzKnCA-kLosaEhqtgBlt9k0TlHg5Y4Goo9D9kyIgPsuUj-GlXMBsywOViBgpl-tt7OVb051_AZAHST9ItzAoBTAB; kuaishou.live.web_ph=8f83f1ed0c54c288352d86b3a23fe996499e; userId=1089887853', 'kuaishou.live.bfb1s=7206d814e5c089a58c910ed8bf52ace5; clientid=3; client_key=65890b29; sid=83cf0fb9ee15fde6b6987781; did=web_c280cf49619a100d8305b6f095ed5348; userId=1509370340; didv=1600053596890; WEBLOGGER_HTTP_SEQ_ID=18928; WEBLOGGER_INCREAMENT_ID_KEY=19033; Hm_lvt_86a27b7db2c5c0ae37fee4a8a35033ee=1601391911; Hm_lpvt_86a27b7db2c5c0ae37fee4a8a35033ee=1602073621; kpn=GAME_ZONE; kuaishou.live.web_st=ChRrdWFpc2hvdS5saXZlLndlYi5zdBKgAcSZ7bO7s4CskEWV8oXNhqBeAW7qebb6Gv8_pmwLpeDLgzlIl3Nwgd2qal6895okBZxhkFFXQLIRvSJss1ALy5H2k0B9JvRZc9RNm4FzKmZ4eWPx2B3f-RLyiz6CkNpEsjcgpCQ90pQyBJgc9nWyo3vJRJS9fl53t2hFW29cvcHJktjSh4VRl-JrZ6WI3r3Et1kGxyh0wGqW3Q3xWzVMmIkaEsvpGUru20c-iIt7T0W8MQrXwiIgMbhYaFs2XBsAXpVKQoE6xmrFlpUw3VfiZaze7OM6mkEoBTAB; kuaishou.live.web_ph=6248010c3bbf0923d41c9d3c231b2903007e; userId=1509370340', 'did=web_03d225bf290f905c1d1a5b3810af3b07; didv=1607676125725; kuaishou.live.bfb1s=9b8f70844293bed778aade6e0a8f9942; clientid=3; client_key=65890b29; kpn=GAME_ZONE; Hm_lvt_86a27b7db2c5c0ae37fee4a8a35033ee=1607676148; Hm_lpvt_86a27b7db2c5c0ae37fee4a8a35033ee=1607676148; userId=2020851346; userId=2020851346; kuaishou.live.web_st=ChRrdWFpc2hvdS5saXZlLndlYi5zdBKgAR00HvTWTmSGTYkxB2Mny84aCe3Lu8qGgMyP7Bhp7Y8lHqkC4TPLNt8H71ZesLiX4hpu9eTzOCiCmzXp7bCHQxzYt6_POWrTVT9L-jgFL4ET3JmTqrjxudwbSZQP-icUJACHgXEpf9a_BXvPS1RrBnl82Gyg1LwLXxIo0uIdwhheHrAp3u4h4C6S5lqWvZ7aNX1nDk_5Jsyq8UqsuCXLNiMaEgCrAu8bFEUPixNgRvVq1Nb0ZSIgKyTMpEkPIRE47p3L_U0hfzZ2IIBI9YfiD5jI34L2zmAoBTAB; kuaishou.live.web_ph=2f965caadcde0572313c46a7f8de50843381' ] res = redis_connection.zrangebyscore(scrape_cookie_key, 0, '+inf', start=0, num=1, withscores=True) if len(res) == 0: cookie = 'kuaishou.live.bfb1s=9b8f70844293bed778aade6e0a8f9942; clientid=3; did=web_2ba8ea2ca07df85a9e193a75d443d128; client_key=65890b29; kpn=GAME_ZONE; userId=1509370340; userId=1509370340; kuaishou.live.web_st=ChRrdWFpc2hvdS5saXZlLndlYi5zdBKgAZIqgEWU0YHOwebl3XEBuu7odxIyvSspUq1v_kvLGCVI_eURJXwii1_AN0CwuxScDBKoK2y2HIghAZLxMVzJhR0wQ4IHJ0zi3TtLsnrbNlA9VnnSZpuHK2b1M2RxEiFEKBRTK0AwnRzN7UOvkKQTfOIxtALOFXqSPOzJ4R8ScI30o_CqmgWPbg7dw0Kt5sgpGFYcZMCA1vgkFhiph-O4cE8aEjGueioax06vmORaF3eBQr3cQSIgsBvfCInpIGIqrTgvvo3648essPww4pkcAKNHOgeNHjUoBTAB; kuaishou.live.web_ph=d77227e666b43027931968af443b14bd7b28' else: cookie, times = res[0] print('获取cookie ' + str(int(times)) + ' ' + cookie[0:30]) headers = { 'user-agent': user_agents.pop(), 'Cookie': cookie, 'Referer': 'https://live.kuaishou.com/u/3xse' + live_stream_id } proxy = Proxy.get() print('获取代理 ' + proxy) proxies = { "http": "http://" + proxy, "https": "http://" + proxy } try: # https://live.kuaishou.com/live_graphql r = requests.post( "https://live.kuaishou.com/m_graphql", json=param_json, headers=headers, proxies=proxies, timeout=30 ) webSocketInfo = r.json()['data']['webSocketInfo'] token = webSocketInfo.get('token') if token is None: redis_connection.zincrby(scrape_cookie_key, 1, cookie) # Proxy.del_proxy(proxy) print('获取Token失败') # 重试10次还失败 if retry > 10: return None, None time.sleep(2) retry = retry + 1 return get_websocket_info(live_stream_id, retry) url = webSocketInfo['webSocketUrls'][0] return url, token except Exception as e: print('获取Token抛出异常:' + str(e)) return get_websocket_info(live_stream_id, retry) def on_message(ws, message): global redis_connection data = [m for m in message] messageDecode = MessageDecode(data) if messageDecode.decode(): if messageDecode.message['payloadType'] == 310: # 弹幕 messageDecode.feed_decode() del messageDecode.message['compressionType'], messageDecode.message['payload'] # if messageDecode.message.get('displayWatchingCount'): # redis_connection.hset(stream_barrage_stat_hash_key, 'displayWatchingCount:' + ws.live_stream_id, # messageDecode.message['displayWatchingCount']) # print("观看人数:" + message.message['displayWatchingCount']) # if messageDecode.message.get('displayLikeCount'): # redis_connection.hset(stream_barrage_stat_hash_key, 'displayLikeCount:' + ws.live_stream_id, # messageDecode.message['displayLikeCount']) # print("点赞人数:" + message.message['displayLikeCount']) messageDecode.message['live_stream_id'] = ws.live_stream_id redis_connection.lpush(stream_barrage_data_key, json.dumps(messageDecode.message)) print(ws.live_stream_id + '_捕获弹幕信息') # # persistent_mysql = PersistentMysql() # persistent_mysql.live_stream_id = ws.live_stream_id # persistent_mysql.created_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # persistent_mysql.batch_insert(messageDecode.message) # print(ws.live_stream_id + "主动断开连接") # ws.close() # sys.stdout.flush() return True # elif messageDecode.message['payloadType'] == 381: # PK # messageDecode.pk_stat_decode() # messageDecode.message['live_stream_id'] = ws.live_stream_id # redis_connection.lpush('OnliveAdmin:barrageScraperPkStatData', json.dumps(messageDecode.message)) # print(messageDecode.message['pkPlayerStatistic']) # sys.stdout.flush() # return True # 如果有退出命令,则退出该线程 if redis_connection.sismember(waiting_exit_stream_key, ws.live_stream_id): redis_connection.srem(waiting_exit_stream_key, ws.live_stream_id) print(ws.live_stream_id + "手动停止爬取") ws.close() def on_error(ws, error): redis_connection.zrem(scraping_stream_key, ws.live_stream_id) print(ws.live_stream_id + error) sys.stdout.flush() def on_close(ws): redis_connection.zrem(scraping_stream_key, ws.live_stream_id) redis_connection.lpush(live_ended_key, ws.live_stream_id) print(ws.live_stream_id + " 直播已结束 " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) sys.stdout.flush() def on_open(ws): part1 = [0x08, 0xC8, 0x01, 0x1A, 0xDC, 0x01, 0x0A, 0xAC, 0x01] # 不变的头 part2 = [ord(c) for c in ws.token] part3 = [0x12, 0x0B] # part4 = [ord(c) for c in ws.live_stream_id] part5 = [0x3A, 0x1E] page_id = get_page_id() part6 = [ord(c) for c in page_id] d = part1 + part2 + part3 + part4 + part5 + part6 ws.send(d, websocket.ABNF.OPCODE_BINARY) def run(): while True: time.sleep(20) if ws.live_stream_id not in get_active_thread_set(): print('停止心跳:' + ws.live_stream_id) sys.exit(0) print('心跳中:' + ws.live_stream_id) # 发送心跳-当前时间戳-毫秒 head = [0x08, 0x01, 0x1A, 0x07, 0x08] timestamp = int(time.time() * 1000) time_arr = MessageDecode.hex_(timestamp) heartbeat = head + time_arr ws.send(heartbeat, websocket.ABNF.OPCODE_BINARY) thread.start_new_thread(run, ()) def scrape(live_stream_id): print("%s 开始执行" % live_stream_id) # 记录当前任务正在运行 redis_connection.zadd(scraping_stream_key, {live_stream_id: int(time.time())}) websocket.enableTrace(False) tokenInfo = redis_connection.hget(scrape_live_token_hash_key, live_stream_id) if (tokenInfo is None) or (tokenInfo == ''): url, token = get_websocket_info(live_stream_id) if url is None: return False redis_connection.hset(scrape_live_token_hash_key, live_stream_id, json.dumps({ 'url': url, 'token': token, })) print(live_stream_id + " 获取Url成功:\n" + url) print(token) else: tokenInfoObj = json.loads(tokenInfo) url = tokenInfoObj.get('url') token = tokenInfoObj.get('token') print(live_stream_id + " 缓存获取Url成功:\n" + url) print(token) sys.stdout.flush() try: ws = websocket.WebSocketApp( url, on_message=on_message, on_error=on_error, on_close=on_close ) ws.on_open = on_open ws.live_stream_id = live_stream_id ws.token = token ws.run_forever(skip_utf8_validation=True) except Exception as e: print(e) ws.close() # 如果爬取的线程过多,则暂时等待 def check_threading_count(): can_scrape_max_threading = redis_connection.get(can_scrape_max_threading_key) if can_scrape_max_threading is None: can_scrape_max_threading = 0 # 如果爬取的线程过多,则暂时等待 if threading.active_count() > int(can_scrape_max_threading): print('### 爬取直播弹幕任务过多,请等待...... ###') # 将标志位设为0, 暂时禁止添加爬取任务 redis_connection.set(can_scrape_sign_key, 0) time.sleep(0.6) return False # 将标志位设为1, 允许添加爬取任务 redis_connection.set(can_scrape_sign_key, 1) return True # 清理脏数据 def clear_scraping_stream(): # 获取Redis中记录的运行中线程列表 active_list = redis_connection.zrange(scraping_stream_key, 0, -1) for live_stream_id in active_list: if live_stream_id not in get_active_thread_set(): redis_connection.zrem(scraping_stream_key, live_stream_id) # 获取Redis中记录的待停止线程列表 waiting_exit_stream_set = redis_connection.smembers(waiting_exit_stream_key) for waiting_exit_stream_id in waiting_exit_stream_set: if waiting_exit_stream_id not in active_list: redis_connection.srem(waiting_exit_stream_key, waiting_exit_stream_id) # 清理Redis中记录的运行中线程列表 def clear_up_scraping_stream(): # 获取Redis中记录的运行中线程列表 # 如果运行中的线程列表中有数据,则先爬取 active_list = redis_connection.zrange(scraping_stream_key, 0, -1) for live_stream_id in active_list: redis_connection.zrem(scraping_stream_key, live_stream_id) redis_connection.rpush(waiting_scrape_stream_key, live_stream_id) def get_active_thread_set(): # 待爬取的直播ID集合 active_thread_set = set() for active_thread in threading.enumerate(): # 线程列表 if active_thread.getName() != 'MainThread': active_thread_set.add(active_thread.getName()) return active_thread_set if __name__ == "__main__": print("主方法开始执行") # redis_connection = redis.StrictRedis(host='127.0.0.1', port=6379, decode_responses=True, db=1) redis_connection = redis.Redis( host='r-2ze28bdb7389a8a4.redis.rds.aliyuncs.com', port=6379, password='Zhuaduoduo2017', decode_responses=True ) # 待爬取直播间ID List # waiting_scrape_stream_key = 'barrageScraperWaitingStreamIdList' waiting_scrape_stream_key = 'BrandLiveData.BarrageScraperWaitingStreamIdList' # 爬取中直播间ID Sort Set # scraping_stream_key = 'barrageScraperScrapingStreamIdSortSet' scraping_stream_key = 'BrandLiveData.BarrageScraperScrapingStreamIdSortSet' # cookie scrape_cookie_key = 'barrageScraperCookieSortSet' # token scrape_live_token_hash_key = 'BrandLiveData.BarrageScraperLiveTokenHash' # 待关闭直播间ID Set waiting_exit_stream_key = 'BrandLiveData.WaitingExitStreamIdSet' # 最大爬取线程数 string can_scrape_max_threading_key = 'BrandLiveData.BarrageScrapeMaxThreadingKey' # 是否可爬取标志位(当爬取线程数已经达到最大时置为0) can_scrape_sign_key = 'BrandLiveData.BarrageScrapeCanScrapeSignKey' # 记录获取的直播间观看人数和点赞数 stream_barrage_stat_hash_key = 'BrandLiveData.BarrageScrapeLiveStreamStatHash' # 记录弹幕接口中获取到的数据 stream_barrage_data_key = 'BrandLiveData.BarrageScraperBarrageData' # 记录直播结束的直播间信息 live_ended_key = 'BrandLiveData.LiveEndedKey' # 先清理Redis中记录的运行中线程列表,一般是由于异常退出造成的 clear_up_scraping_stream() while True: # 判断爬取线程是否超过设定的最大值 if check_threading_count() is False: continue # 从 待爬取直播间ID 列表中获取一个 直播间ID stream_id = redis_connection.rpop(waiting_scrape_stream_key) # 如果没有待爬取的直播,则等一秒,循环 if stream_id is None: # print("### 等待爬取直播弹幕...... ###") time.sleep(0.6) continue # 判断得到的直播间ID在爬取中 # 判断线程中是否已经有该任务,有则跳过 if stream_id in get_active_thread_set(): print("%s 已经在执行" % stream_id) continue task = threading.Thread(target=scrape, args=(stream_id,), name=stream_id, daemon=False) task.start() # 准备就绪,等待cpu执行 # 清理脏数据 clear_scraping_stream() time.sleep(1)