123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384 |
- #!/usr/bin/python3
- # coding=utf-8
- # -*- coding: utf-8 -*-
- import random
- import websocket
- import redis
- import requests
- import threading
- import sys
- import json
- from barrage import MessageDecode
- from libs.proxy import Proxy
- try:
- import thread
- except ImportError:
- import _thread as thread
- import time
- def get_rand_str(len):
- charset = "bjectSymhasOwnProp0123456789ABCDEFGHIJKLMNQRTUVWXYZdfgiklquvxz"
- str = ''
- for _ in range(0, len):
- str += random.choice(charset)
- return str
- def get_page_id():
- page_id = get_rand_str(16)
- page_id += "_"
- page_id += str(int(time.time() * 1000))
- return page_id
- def get_websocket_info(live_stream_id, retry=0):
- param_json = {
- "operationName": "WebSocketInfoQuery",
- "variables": {
- "liveStreamId": live_stream_id
- },
- "query": "query WebSocketInfoQuery($liveStreamId: String) {\n webSocketInfo(liveStreamId: $liveStreamId) {\n token\n webSocketUrls\n __typename\n }\n}\n"
- }
- user_agents = {
- 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3908.2 Safari/537.36',
- 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36',
- 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/82.0.4055.0 Safari/537.36',
- 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36',
- }
- cookie_list = [
- 'web_8f5a71d99d502219d7ce1ca1ffec68cc; clientid=3; client_key=65890b29; Hm_lvt_86a27b7db2c5c0ae37fee4a8a35033ee=1595212029,1597053525; userId=1089887853; kuaishou.live.bfb1s=477cb0011daca84b36b3a4676857e5a1; didv=1603703368000; kpn=GAME_ZONE; kuaishou.live.web_st=ChRrdWFpc2hvdS5saXZlLndlYi5zdBKgATKQJmTG8mk6lH2pOJeVa-184Sq95qCmEB1lP_q3lItdj6a6gnYgCIcPkt1ZvnJlE3cAkBVIiK46xUEyJ1SZe2hatA7MGwchvIhNlvumZQWfwUHwDOKWRkpNOn-4c0wYnHmgituvkv2B2XJC1lnaLBJ57zZU6iKrtgtXyekfNrf37VW3n0cpGF8LfjxvfQibwYfxewf2uOhDQzKnCA-kLosaEhqtgBlt9k0TlHg5Y4Goo9D9kyIgPsuUj-GlXMBsywOViBgpl-tt7OVb051_AZAHST9ItzAoBTAB; kuaishou.live.web_ph=8f83f1ed0c54c288352d86b3a23fe996499e; userId=1089887853',
- 'kuaishou.live.bfb1s=7206d814e5c089a58c910ed8bf52ace5; clientid=3; client_key=65890b29; sid=83cf0fb9ee15fde6b6987781; did=web_c280cf49619a100d8305b6f095ed5348; userId=1509370340; didv=1600053596890; WEBLOGGER_HTTP_SEQ_ID=18928; WEBLOGGER_INCREAMENT_ID_KEY=19033; Hm_lvt_86a27b7db2c5c0ae37fee4a8a35033ee=1601391911; Hm_lpvt_86a27b7db2c5c0ae37fee4a8a35033ee=1602073621; kpn=GAME_ZONE; kuaishou.live.web_st=ChRrdWFpc2hvdS5saXZlLndlYi5zdBKgAcSZ7bO7s4CskEWV8oXNhqBeAW7qebb6Gv8_pmwLpeDLgzlIl3Nwgd2qal6895okBZxhkFFXQLIRvSJss1ALy5H2k0B9JvRZc9RNm4FzKmZ4eWPx2B3f-RLyiz6CkNpEsjcgpCQ90pQyBJgc9nWyo3vJRJS9fl53t2hFW29cvcHJktjSh4VRl-JrZ6WI3r3Et1kGxyh0wGqW3Q3xWzVMmIkaEsvpGUru20c-iIt7T0W8MQrXwiIgMbhYaFs2XBsAXpVKQoE6xmrFlpUw3VfiZaze7OM6mkEoBTAB; kuaishou.live.web_ph=6248010c3bbf0923d41c9d3c231b2903007e; userId=1509370340',
- 'did=web_03d225bf290f905c1d1a5b3810af3b07; didv=1607676125725; kuaishou.live.bfb1s=9b8f70844293bed778aade6e0a8f9942; clientid=3; client_key=65890b29; kpn=GAME_ZONE; Hm_lvt_86a27b7db2c5c0ae37fee4a8a35033ee=1607676148; Hm_lpvt_86a27b7db2c5c0ae37fee4a8a35033ee=1607676148; userId=2020851346; userId=2020851346; kuaishou.live.web_st=ChRrdWFpc2hvdS5saXZlLndlYi5zdBKgAR00HvTWTmSGTYkxB2Mny84aCe3Lu8qGgMyP7Bhp7Y8lHqkC4TPLNt8H71ZesLiX4hpu9eTzOCiCmzXp7bCHQxzYt6_POWrTVT9L-jgFL4ET3JmTqrjxudwbSZQP-icUJACHgXEpf9a_BXvPS1RrBnl82Gyg1LwLXxIo0uIdwhheHrAp3u4h4C6S5lqWvZ7aNX1nDk_5Jsyq8UqsuCXLNiMaEgCrAu8bFEUPixNgRvVq1Nb0ZSIgKyTMpEkPIRE47p3L_U0hfzZ2IIBI9YfiD5jI34L2zmAoBTAB; kuaishou.live.web_ph=2f965caadcde0572313c46a7f8de50843381'
- ]
- res = redis_connection.zrangebyscore(scrape_cookie_key, 0, '+inf', start=0, num=1, withscores=True)
- if len(res) == 0:
- cookie = 'kuaishou.live.bfb1s=9b8f70844293bed778aade6e0a8f9942; clientid=3; did=web_2ba8ea2ca07df85a9e193a75d443d128; client_key=65890b29; kpn=GAME_ZONE; userId=1509370340; userId=1509370340; kuaishou.live.web_st=ChRrdWFpc2hvdS5saXZlLndlYi5zdBKgAZIqgEWU0YHOwebl3XEBuu7odxIyvSspUq1v_kvLGCVI_eURJXwii1_AN0CwuxScDBKoK2y2HIghAZLxMVzJhR0wQ4IHJ0zi3TtLsnrbNlA9VnnSZpuHK2b1M2RxEiFEKBRTK0AwnRzN7UOvkKQTfOIxtALOFXqSPOzJ4R8ScI30o_CqmgWPbg7dw0Kt5sgpGFYcZMCA1vgkFhiph-O4cE8aEjGueioax06vmORaF3eBQr3cQSIgsBvfCInpIGIqrTgvvo3648essPww4pkcAKNHOgeNHjUoBTAB; kuaishou.live.web_ph=d77227e666b43027931968af443b14bd7b28'
- else:
- cookie, times = res[0]
- print('获取cookie ' + str(int(times)) + ' ' + cookie[0:30])
- headers = {
- 'user-agent': user_agents.pop(),
- 'Cookie': cookie,
- 'Referer': 'https://live.kuaishou.com/u/3xse' + live_stream_id
- }
- proxy = Proxy.get()
- print('获取代理 ' + proxy)
- proxies = {
- "http": "http://" + proxy,
- "https": "http://" + proxy
- }
- try:
- # https://live.kuaishou.com/live_graphql
- r = requests.post(
- "https://live.kuaishou.com/m_graphql",
- json=param_json,
- headers=headers,
- proxies=proxies,
- timeout=30
- )
- webSocketInfo = r.json()['data']['webSocketInfo']
- token = webSocketInfo.get('token')
- if token is None:
- redis_connection.zincrby(scrape_cookie_key, 1, cookie)
- # Proxy.del_proxy(proxy)
- print('获取Token失败')
- # 重试10次还失败
- if retry > 10:
- return None, None
- time.sleep(2)
- retry = retry + 1
- return get_websocket_info(live_stream_id, retry)
- url = webSocketInfo['webSocketUrls'][0]
- return url, token
- except Exception as e:
- print('获取Token抛出异常:' + str(e))
- return get_websocket_info(live_stream_id, retry)
- def on_message(ws, message):
- global redis_connection
- data = [m for m in message]
- messageDecode = MessageDecode(data)
- if messageDecode.decode():
- if messageDecode.message['payloadType'] == 310: # 弹幕
- messageDecode.feed_decode()
- del messageDecode.message['compressionType'], messageDecode.message['payload']
- # if messageDecode.message.get('displayWatchingCount'):
- # redis_connection.hset(stream_barrage_stat_hash_key, 'displayWatchingCount:' + ws.live_stream_id,
- # messageDecode.message['displayWatchingCount'])
- # print("观看人数:" + message.message['displayWatchingCount'])
- # if messageDecode.message.get('displayLikeCount'):
- # redis_connection.hset(stream_barrage_stat_hash_key, 'displayLikeCount:' + ws.live_stream_id,
- # messageDecode.message['displayLikeCount'])
- # print("点赞人数:" + message.message['displayLikeCount'])
- messageDecode.message['live_stream_id'] = ws.live_stream_id
- redis_connection.lpush(stream_barrage_data_key, json.dumps(messageDecode.message))
- print(ws.live_stream_id + '_捕获弹幕信息')
- #
- # persistent_mysql = PersistentMysql()
- # persistent_mysql.live_stream_id = ws.live_stream_id
- # persistent_mysql.created_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- # persistent_mysql.batch_insert(messageDecode.message)
- # print(ws.live_stream_id + "主动断开连接")
- # ws.close()
- # sys.stdout.flush()
- return True
- # elif messageDecode.message['payloadType'] == 381: # PK
- # messageDecode.pk_stat_decode()
- # messageDecode.message['live_stream_id'] = ws.live_stream_id
- # redis_connection.lpush('OnliveAdmin:barrageScraperPkStatData', json.dumps(messageDecode.message))
- # print(messageDecode.message['pkPlayerStatistic'])
- # sys.stdout.flush()
- # return True
- # 如果有退出命令,则退出该线程
- if redis_connection.sismember(waiting_exit_stream_key, ws.live_stream_id):
- redis_connection.srem(waiting_exit_stream_key, ws.live_stream_id)
- print(ws.live_stream_id + "手动停止爬取")
- ws.close()
- def on_error(ws, error):
- redis_connection.zrem(scraping_stream_key, ws.live_stream_id)
- print(ws.live_stream_id + error)
- sys.stdout.flush()
- def on_close(ws):
- redis_connection.zrem(scraping_stream_key, ws.live_stream_id)
- redis_connection.lpush(live_ended_key, ws.live_stream_id)
- print(ws.live_stream_id + " 直播已结束 " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
- sys.stdout.flush()
- def on_open(ws):
- part1 = [0x08, 0xC8, 0x01, 0x1A, 0xDC, 0x01, 0x0A, 0xAC, 0x01] # 不变的头
- part2 = [ord(c) for c in ws.token]
- part3 = [0x12, 0x0B] #
- part4 = [ord(c) for c in ws.live_stream_id]
- part5 = [0x3A, 0x1E]
- page_id = get_page_id()
- part6 = [ord(c) for c in page_id]
- d = part1 + part2 + part3 + part4 + part5 + part6
- ws.send(d, websocket.ABNF.OPCODE_BINARY)
- def run():
- while True:
- time.sleep(20)
- if ws.live_stream_id not in get_active_thread_set():
- print('停止心跳:' + ws.live_stream_id)
- sys.exit(0)
- print('心跳中:' + ws.live_stream_id)
- # 发送心跳-当前时间戳-毫秒
- head = [0x08, 0x01, 0x1A, 0x07, 0x08]
- timestamp = int(time.time() * 1000)
- time_arr = MessageDecode.hex_(timestamp)
- heartbeat = head + time_arr
- ws.send(heartbeat, websocket.ABNF.OPCODE_BINARY)
- thread.start_new_thread(run, ())
- def scrape(live_stream_id):
- print("%s 开始执行" % live_stream_id)
- # 记录当前任务正在运行
- redis_connection.zadd(scraping_stream_key, {live_stream_id: int(time.time())})
- websocket.enableTrace(False)
- tokenInfo = redis_connection.hget(scrape_live_token_hash_key, live_stream_id)
- if (tokenInfo is None) or (tokenInfo == ''):
- url, token = get_websocket_info(live_stream_id)
- if url is None:
- return False
- redis_connection.hset(scrape_live_token_hash_key, live_stream_id, json.dumps({
- 'url': url,
- 'token': token,
- }))
- print(live_stream_id + " 获取Url成功:\n" + url)
- print(token)
- else:
- tokenInfoObj = json.loads(tokenInfo)
- url = tokenInfoObj.get('url')
- token = tokenInfoObj.get('token')
- print(live_stream_id + " 缓存获取Url成功:\n" + url)
- print(token)
- sys.stdout.flush()
- try:
- ws = websocket.WebSocketApp(
- url,
- on_message=on_message,
- on_error=on_error,
- on_close=on_close
- )
- ws.on_open = on_open
- ws.live_stream_id = live_stream_id
- ws.token = token
- ws.run_forever(skip_utf8_validation=True)
- except Exception as e:
- print(e)
- ws.close()
- # 如果爬取的线程过多,则暂时等待
- def check_threading_count():
- can_scrape_max_threading = redis_connection.get(can_scrape_max_threading_key)
- if can_scrape_max_threading is None:
- can_scrape_max_threading = 0
- # 如果爬取的线程过多,则暂时等待
- if threading.active_count() > int(can_scrape_max_threading):
- print('### 爬取直播弹幕任务过多,请等待...... ###')
- # 将标志位设为0, 暂时禁止添加爬取任务
- redis_connection.set(can_scrape_sign_key, 0)
- time.sleep(0.6)
- return False
- # 将标志位设为1, 允许添加爬取任务
- redis_connection.set(can_scrape_sign_key, 1)
- return True
- # 清理脏数据
- def clear_scraping_stream():
- # 获取Redis中记录的运行中线程列表
- active_list = redis_connection.zrange(scraping_stream_key, 0, -1)
- for live_stream_id in active_list:
- if live_stream_id not in get_active_thread_set():
- redis_connection.zrem(scraping_stream_key, live_stream_id)
- # 获取Redis中记录的待停止线程列表
- waiting_exit_stream_set = redis_connection.smembers(waiting_exit_stream_key)
- for waiting_exit_stream_id in waiting_exit_stream_set:
- if waiting_exit_stream_id not in active_list:
- redis_connection.srem(waiting_exit_stream_key, waiting_exit_stream_id)
- # 清理Redis中记录的运行中线程列表
- def clear_up_scraping_stream():
- # 获取Redis中记录的运行中线程列表
- # 如果运行中的线程列表中有数据,则先爬取
- active_list = redis_connection.zrange(scraping_stream_key, 0, -1)
- for live_stream_id in active_list:
- redis_connection.zrem(scraping_stream_key, live_stream_id)
- redis_connection.rpush(waiting_scrape_stream_key, live_stream_id)
- def get_active_thread_set():
- # 待爬取的直播ID集合
- active_thread_set = set()
- for active_thread in threading.enumerate(): # 线程列表
- if active_thread.getName() != 'MainThread':
- active_thread_set.add(active_thread.getName())
- return active_thread_set
- if __name__ == "__main__":
- print("主方法开始执行")
- # redis_connection = redis.StrictRedis(host='127.0.0.1', port=6379, decode_responses=True, db=1)
- redis_connection = redis.Redis(
- host='r-2ze28bdb7389a8a4.redis.rds.aliyuncs.com',
- port=6379,
- password='Zhuaduoduo2017',
- decode_responses=True
- )
- # 待爬取直播间ID List
- # waiting_scrape_stream_key = 'barrageScraperWaitingStreamIdList'
- waiting_scrape_stream_key = 'BrandLiveData.BarrageScraperWaitingStreamIdList'
- # 爬取中直播间ID Sort Set
- # scraping_stream_key = 'barrageScraperScrapingStreamIdSortSet'
- scraping_stream_key = 'BrandLiveData.BarrageScraperScrapingStreamIdSortSet'
- # cookie
- scrape_cookie_key = 'barrageScraperCookieSortSet'
- # token
- scrape_live_token_hash_key = 'BrandLiveData.BarrageScraperLiveTokenHash'
- # 待关闭直播间ID Set
- waiting_exit_stream_key = 'BrandLiveData.WaitingExitStreamIdSet'
- # 最大爬取线程数 string
- can_scrape_max_threading_key = 'BrandLiveData.BarrageScrapeMaxThreadingKey'
- # 是否可爬取标志位(当爬取线程数已经达到最大时置为0)
- can_scrape_sign_key = 'BrandLiveData.BarrageScrapeCanScrapeSignKey'
- # 记录获取的直播间观看人数和点赞数
- stream_barrage_stat_hash_key = 'BrandLiveData.BarrageScrapeLiveStreamStatHash'
- # 记录弹幕接口中获取到的数据
- stream_barrage_data_key = 'BrandLiveData.BarrageScraperBarrageData'
- # 记录直播结束的直播间信息
- live_ended_key = 'BrandLiveData.LiveEndedKey'
- # 先清理Redis中记录的运行中线程列表,一般是由于异常退出造成的
- clear_up_scraping_stream()
- while True:
- # 判断爬取线程是否超过设定的最大值
- if check_threading_count() is False:
- continue
- # 从 待爬取直播间ID 列表中获取一个 直播间ID
- stream_id = redis_connection.rpop(waiting_scrape_stream_key)
- # 如果没有待爬取的直播,则等一秒,循环
- if stream_id is None:
- # print("### 等待爬取直播弹幕...... ###")
- time.sleep(0.6)
- continue
- # 判断得到的直播间ID在爬取中
- # 判断线程中是否已经有该任务,有则跳过
- if stream_id in get_active_thread_set():
- print("%s 已经在执行" % stream_id)
- continue
- task = threading.Thread(target=scrape, args=(stream_id,), name=stream_id, daemon=False)
- task.start() # 准备就绪,等待cpu执行
- # 清理脏数据
- clear_scraping_stream()
- time.sleep(1)
|